clean_convert

Recursively cleans None values from lists and dictionaries, handling nested structures and converting datetime objects to ISO format.

Installation

from gluestick.etl_utils import clean_convert

Basic Usage

# Clean dictionary
cleaned_dict = clean_convert({
    'name': 'John',
    'age': None,
    'scores': [10, None, 20]
})

# Clean nested structures
cleaned_nested = clean_convert({
    'user': {
        'name': 'John',
        'details': [
            {'score': 10},
            {'score': None}
        ]
    }
})

Parameters

  • input (dict, list): Input data structure to clean
  • Handles:
    • Dictionaries
    • Lists
    • Datetime objects
    • Scalar values

Returns

Cleaned data structure with None values removed


map_fields

Maps row values according to a specified mapping dictionary, supporting nested structures and arrays.

Usage

from gluestick.etl_utils import map_fields

# Basic field mapping
mapping = {
    'user_name': 'name',
    'user_age': 'age'
}
result = map_fields(row, mapping)

# Nested mapping
mapping = {
    'user': {
        'fullName': 'name',
        'age': 'age'
    },
    'scores': ['test_scores']
}
result = map_fields(row, mapping)

Parameters

  • row (dict): Source data row
  • mapping (dict): Mapping configuration
    • Keys: Target field names
    • Values: Source field names or nested mappings

Returns

Dictionary with mapped values


rename

Renames DataFrame columns using JSON format with support for type conversion.

Usage

from gluestick.pandas_utils import rename

# Basic column renaming
new_df = rename(df, {
    'old_name': 'new_name',
    'previous': 'current'
})

# Select specific columns
new_df = rename(df, ['col1', 'col2', 'col3'])

Parameters

  • df (pd.DataFrame): Input DataFrame
  • target_columns (dict, list):
    • dict: Mapping of old to new column names
    • list: Columns to select

Returns

DataFrame with renamed columns

Notes

  • Supports both renaming and column selection
  • Preserves data types
  • Returns original DataFrame if no mapping provided
  • Only renames existing columns

localize_datetime

Converts DataFrame datetime columns to UTC timezone, handling both naive and timezone-aware timestamps.

Usage

from gluestick.etl_utils import localize_datetime

# Localize single column
df['timestamp'] = localize_datetime(df, 'timestamp')

# Process multiple columns
for col in ['created_at', 'updated_at']:
    df[col] = localize_datetime(df, col)

Parameters

  • df (pd.DataFrame): Input DataFrame
  • column_name (str): Name of datetime column to localize

Returns

Series with localized datetime values


deep_convert_datetimes

Recursively transforms all datetime objects to ISO format strings within nested data structures.

Usage

from gluestick.singer import deep_convert_datetimes

# Convert nested structure
converted = deep_convert_datetimes({
    'created': datetime.now(),
    'items': [
        {'date': datetime.now()},
        {'date': datetime.now()}
    ]
})

# Convert list of records
converted_list = deep_convert_datetimes([
    {'timestamp': datetime.now()},
    {'timestamp': datetime.now()}
])

Parameters

  • value (any): Input value or data structure
    • Handles dictionaries, lists, datetime objects
    • Processes nested structures recursively

Returns

Data structure with datetime objects converted to ISO format strings

Notes

  • Uses “%Y-%m-%dT%H:%M:%S.%fZ” format
  • Handles both datetime and date objects
  • Preserves original data structure
  • Safe for non-datetime values

exception

Recommended: Standardized error handling and logging for ETL pipelines.

Usage

from gluestick.etl_utils import exception

try:
    # Your ETL code here
    process_data()
except ValueError as e:
    exception(e, ROOT_DIR, "Error processing data")

# With additional context
try:
    transform_data()
except Exception as e:
    exception(
        e,
        ROOT_DIR,
        f"Failed processing stream {stream_name}"
    )

Parameters

  • exception (Exception): Caught exception
  • root_dir (str): Directory for error log
  • error_message (str): Additional context message

Notes

  • Creates consistent error format
  • Logs errors to ‘errors.txt’
  • Preserves original exception details
  • Adds contextual information

Common Patterns

Processing DateTime Data

import gluestick as gs
from datetime import datetime

def process_timestamps(df):
    # Localize timezone columns
    datetime_cols = ['created_at', 'updated_at']
    for col in datetime_cols:
        df[col] = gs.localize_datetime(df, col)
    
    # Process nested data with timestamps
    df['metadata'] = df['metadata'].apply(gs.deep_convert_datetimes)
    
    return df

# Clean and map data with timestamps
def transform_data(input_data):
    # First clean None values and convert dates
    cleaned = gs.clean_convert(input_data)
    
    # Define mapping with timestamp fields
    mapping = {
        'user': {
            'created': 'created_at',
            'modified': 'updated_at'
        }
    }
    
    # Map and process fields
    return gs.map_fields(cleaned, mapping)