snapshot_records

A function for managing data snapshots by updating existing snapshots or creating new ones. Handles both CSV and Parquet formats with support for type coercion.

Installation

from gluestick.etl_utils import snapshot_records

Basic Usage

# Create or update a snapshot for the users table
updated_data = snapshot_records(
    stream_data,
    "users",
    SNAPSHOT_DIR,
    pk="user_id"
)

Parameters

  • stream_data (DataFrame): New data to be included in snapshot
  • stream (str): Name of the stream
  • snapshot_dir (str): Directory for storing snapshots
  • pk (str|list): Primary key(s) for deduplication
  • just_new (bool): Adds only new records if True
  • use_csv (bool): Use CSV format instead of Parquet
  • coerce_types (bool): Force snapshot types to match stream_data
  • **kwargs: Additional pandas read options

Returns

pandas DataFrame containing either complete snapshot or just new records


read_snapshots

Read existing snapshot data into a dataframe.

Usage

from gluestick.etl_utils import read_snapshots

# Read existing snapshot
snapshot_df = read_snapshots(
    "users",
    SNAPSHOT_DIR
)

Parameters

  • stream (str): Name of the stream to read
  • snapshot_dir (str): Directory containing snapshots
  • **kwargs: Additional pandas read options

Returns

pandas DataFrame containing snapshot data, or None if no snapshot exists


drop_redundant

Removes duplicate rows based on content hashing, maintaining a snapshot of processed data.

Usage

from gluestick.etl_utils import drop_redundant

# Basic deduplication
clean_df = drop_redundant(
    df,
    "users",
    OUTPUT_DIR,
    pk="user_id"
)

# Track updated records
clean_df = drop_redundant(
    df,
    "users",
    OUTPUT_DIR,
    pk=["user_id", "email"],
    updated_flag=True
)

Parameters

  • df (DataFrame): DataFrame to check for duplicates
  • name (str): Name for snapshot hash file
  • output_dir (str): Directory to save state files
  • pk (str|list): Primary key(s) for state tracking
  • updated_flag (bool): Add flag for new/updated rows
  • use_csv (bool): Use CSV format instead of Parquet

Returns

pandas DataFrame with redundant rows removed


Common Patterns

Incremental Processing

etl.py
import gluestick as gs

# Process only new records
reader = gs.Reader()
for stream in eval(str(reader)):
    # Get new data
    input_df = reader.get(stream)
    
    # Combine new data with previously snapshotted data
    all_records = gs.snapshot_records(
        input_df,
        stream,
        SNAPSHOT_DIR,
        pk=reader.get_pk(stream)
    )
    row_count = all_records.shape[0]
    print(f"Total number of records across jobs: {row_count}")