Connector Details

NameValue
PlatformPostgreSQL
Auth TypeAPI Keys
DirectionBidirectional
Tap Repohttps://gitlab.com/hotglue/tap-postgres
Target Repohttps://github.com/hotgluexyz/pipelinewise-target-postgres
Tap Metrics

Usage:

Target Metrics

Usage:

Target Postgres

Config

target-postgres requires the standard 6 connection parameters to be specified in the config:

{
    "host": "https://...",
    "port": "5432",
    "user": "...",
    "password": "...",
    "dbname": "...",
    "default_target_schema": "public"
}

Example ETL Script

import gluestick as gs
import os

# Define standard directories of hot
ROOT_DIR = os.environ.get("ROOT_DIR", ".")
INPUT_DIR = f"{ROOT_DIR}/sync-output"
OUTPUT_DIR = f"{ROOT_DIR}/etl-output"

# Read sync output
input = gs.Reader()

# Get tenant id
tenant_id = os.environ.get('USER_ID', os.environ.get('TENANT', 'default'))

# Iterate through the different streams in the sync output
for key in eval(str(input)):
    input_df = input.get(key)

    """
    Here we get the metadata for extracting the key properties, such as primary and replication keys.
    The database export targets will utilize these primary keys when upserting data.
    If you wish to hardcode your choice of primary keys, you can do so here.
    """
    metadata = input.get_metadata(key)
    if metadata.get("key_properties"):
        # Use the key_properties (e.g. primary and replication keys) from the catalog
        key_properties = eval(metadata["key_properties"])
    else:
        key_properties = []

    # Include tenant_id as a field if desired
    input_df["tenant"] = tenant_id

    # Write this stream to the OUTPUT directory with the specified key_properties
    gs.to_singer(input_df, key, OUTPUT_DIR, keys=key_properties)

Optional config flags

PropertyTypeDescription
sslBoolean(Default: False) Enable SSL for the connection to Postgres.
insertion_methodString(Default: ‘incremental’) Currently only handles ‘truncate’ (drop and re-create table before adding rows) and ‘incremental’ (upsert new rows)
batch_size_rowsInteger(Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Postgres.
flush_all_streamsBoolean(Default: False) Flush and load every stream into Postgres when one batch is full. Warning: This may trigger the COPY command to use files with low number of records.
parallelismInteger(Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max.
max_parallelismInteger(Default: 16) Max number of parallel threads to use when flushing tables.
default_target_schemaStringName of the schema where the tables will be created. If schema_mapping is not defined then every stream sent by the tap is loaded into this schema.
default_target_schema_select_permissionStringGrant USAGE privilege on newly created schemas and grant SELECT privilege on newly created
schema_mappingObjectUseful if you want to load multiple streams from one tap to multiple Postgres schemas. If the tap sends the stream_id in <schema_name>-<table_name> format then this option overwrites the default_target_schema value. Note, that using schema_mapping you can overwrite the default_target_schema_select_permission value to grant SELECT permissions to different groups per schemas or optionally you can create indices automatically for the replicated tables. Note: This is an experimental feature and recommended to use via PipelineWise YAML files that will generate the object mapping in the right JSON format. For further info check a PipelineWise YAML Example.
add_metadata_columnsBoolean(Default: False) Metadata columns add extra row level information about data ingestions, (i.e. when was the row read in source, when was inserted or deleted in postgres etc.) Metadata columns are creating automatically by adding extra columns to the tables with a column prefix _SDC_. The column names are following the stitch naming conventions documented at https://www.stitchdata.com/docs/data-structure/integration-schemas#sdc-columns. Enabling metadata columns will flag the deleted rows by setting the _SDC_DELETED_AT metadata column. Without the add_metadata_columns option the deleted rows from singer taps will not be recognisable in Postgres.
hard_deleteBoolean(Default: False) When hard_delete option is true then DELETE SQL commands will be performed in Postgres to delete rows in tables. It’s achieved by continuously checking the _SDC_DELETED_AT metadata column sent by the singer tap. Due to deleting rows requires metadata columns, hard_delete option automatically enables the add_metadata_columns option as well.
data_flattening_max_levelInteger(Default: 0) Object type RECORD items from taps can be transformed to flattened columns by creating columns automatically. When value is 0 (default) then flattening functionality is turned off.
primary_key_requiredBoolean(Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined.
validate_recordsBoolean(Default: False) Validate every single record message to the corresponding JSON schema. This option is disabled by default and invalid RECORD messages will fail only at load time by Postgres. Enabling this option will detect invalid records earlier but could cause performance degradation.
temp_dirString(Default: platform-dependent) Directory of temporary CSV files with RECORD messages.
insertion_method_tablesArray(String)(Default: []) Tables to apply “insertion method” to. Has no effect if insertion method not specified.

Target Changelog