Skip to main content
Both Polars readers support snapshots via the following functions:

read_snapshots

The read_snapshots function reads a snapshot file for a given stream and returns it as either a Polars DataFrame or Lazyframe.
  • Parameters:
    • stream: The name of the stream to read the snapshot for. For example, ‘contacts’.
    • snapshot_dir: The path to the directory where snapshot files are stored.
  • Example:
    # Example usage of read_snapshots
    reader = PolarsReader()
    stream_name = 'contacts'
    snapshot_directory = '/path/to/snapshots'
    
    # Read snapshot for the 'contacts' stream
    snapshot_df = reader.read_snapshots(stream=stream_name, snapshot_dir=snapshot_directory)
    
    if snapshot_df is not None:
        print("Snapshot DataFrame:")
        print(snapshot_df)
    else:
        print("No snapshot available.")
    

snapshot_records

The snapshot_records function updates a snapshot file with new data for a given stream and returns a merged Polars DataFrame or Lazyframe.
  • Parameters:
    • stream_data: DataFrame (or Lazyframe) containing the new data to be merged.
    • stream: The name of the stream for the snapshots, e.g., ‘contacts’.
    • snapshot_dir: Path to the directory where snapshot files are stored.
    • pk: Primary key(s) to use when merging snapshot; can be a string or a list of strings.
    • just_new: If True, returns just the new data, otherwise returns all merged data.
    • use_csv: If True, saves and reads snapshots in CSV format instead of Parquet.
    • overwrite: If True, overwrites existing snapshot files instead of merging.
  • Example:
    import gluestick as gs
    
    # Process only new records
    reader = gs.PolarsReader()
    snapshot_directory = '/path/to/snapshots'
    
    for stream in eval(str(reader)):
        # Get new data
        stream_data = reader.get(stream)
    
        # Update snapshot and get merged data
        merged_df = reader.snapshot_records(
            stream_data=stream_data,
            stream=stream,
            snapshot_dir=snapshot_directory,
            pk=reader.get_pk(stream),
            just_new=False,
            use_csv=False,
            overwrite=False
        )
    
        gs.to_export(df, stream, "./etl-output", keys=[reader.get_pk(stream)])