Connector Details

NameValue
PlatformSnowflake
Auth TypeAPI Keys
DirectionBidirectional
Tap Repohttps://gitlab.com/hotglue/tap-snowflake
Target Repohttps://gitlab.com/hotglue/target-snowflake
Tap MetricsUsage:
Target MetricsUsage:

Credentials Setup

The snowflake connector uses the following 8 connection parameters:

{
  "user": "my_user",
  "account": "rtxxxxx.eu-central-1",
  "warehouse": "my_virtual_warehouse",
  "dbname": "database_name",
  "default_target_schema": "my_target_schema"
  "private_key": "---BEGIN ENCRYPTED PRIVATE KEY----.....",
  "private_key_password": "..."
  "file_format": "snowflake_file_format_object_name", (Target only)
}

Follow the steps below to find each of these in Snowflake.

1. user

If you do not already have a Snowflake account, you can create one here.

This is the username that should be used when linking the Snowflake connector.

2. account

Once signed in, you’ll be taken to a homepage with a url that looks like:

https://app.snowflake.com/<ORG_ID>/<ACCOUNT_ID>/#/homepage

For the account field, users should enter “ORG_ID-ACCOUNT_ID”.

For example, if this is your url:

https://app.snowflake.com/rybaofs/ftb21143/#/homepage

Your account parameter should be set to rybaofs-ftb21143.

3. warehouse

You can find a list of valid warehouses in Admin > Warehouses

Use the name of your primary, active warehouse for the warehouse field.

4. database, and schema

Navigate to Data > Databases, and click the Add Database button in the top right corner.

You can now create the new database, noting the name:

Next, you should search for Schema in the secondary navigation bar. You will now be able to see your database name (this example is called Target Test). Within Target Test, you can see the available schemas for this database. Input the desired schema as the schema field in hotglue.

5. file_format

Next, you need to create a named file format in your Snowflake schema. This will be used by the MERGE/COPY commands to parse the files correctly from S3.

To use CSV files you can create a file format using the command:

CREATE FILE FORMAT {database}.{schema}.{file_format_name}
TYPE = 'CSV' ESCAPE='\\' FIELD_OPTIONALLY_ENCLOSED_BY='"';

You should use your chosen file_format_name as the file_format connection parameter.

6. Authenticating with key-pair authentication

Finally, you can generate your private key by running the following command in your local terminal:

openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt

If you wish to use an encrypted key, you can instead run the command without the -nocrypt flag:

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

If you are using an encrypted key, enter your key’s password into the private_key_password field.

These commands will generate a private key in the PEM format:

-----BEGIN ENCRYPTED PRIVATE KEY-----
MIIE6T...
-----END ENCRYPTED PRIVATE KEY-----

You can copy and paste the entirety of that key directly into Hotglue for the private_key parameter.

Next, generate a public key for your private key with:

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub

Finally, execute the following command in Snowflake to associate your public key with your user profile:

ALTER USER example_user SET RSA_PUBLIC_KEY='MIIBIjANBgkqh...';

Target Snowflake

Target Snowflake creates Snowflake tables and uploads data for every stream passed to it by the ETL output.

Example ETL Script

import gluestick as gs
import os

# Define standard directories of hot
ROOT_DIR = os.environ.get("ROOT_DIR", ".")
INPUT_DIR = f"{ROOT_DIR}/sync-output"
OUTPUT_DIR = f"{ROOT_DIR}/etl-output"


# Read sync output
input = gs.Reader()

# Get tenant id
tenant_id = os.environ.get('USER_ID', os.environ.get('TENANT', 'default'))

# Iterate through the different streams in the sync output
for key in eval(str(input)):
    input_df = input.get(key)

    """
    Here we get the metadata for extracting the key properties, such as primary and replication keys.
    The database export targets will utilize these primary keys when upserting data.
    If you wish to hardcode your choice of primary keys, you can do so here.
    """
    metadata = input.get_metadata(key)
    if metadata.get("key_properties"):
        # Use the key_properties (e.g. primary and replication keys) from the catalog
        key_properties = eval(metadata["key_properties"])
    else:
        key_properties = []

    # Include tenant_id as a field if desired
    input_df["tenant"] = tenant_id

    # Write this stream to the OUTPUT directory with the specified key_properties
    gs.to_singer(input_df, key, OUTPUT_DIR, keys=key_properties)

Available config flags

PropertyTypeRequired?Description
batch_size_rowsInteger(Default: 100000) Maximum number of rows in each batch. At the end of each batch, the rows in the batch are loaded into Snowflake.
batch_wait_limit_secondsInteger(Default: None) Maximum time to wait for batch to reach batch_size_rows.
flush_all_streamsBoolean(Default: False) Flush and load every stream into Snowflake when one batch is full. Warning: This may trigger the COPY command to use files with low number of records, and may cause performance problems.
parallelismInteger(Default: 0) The number of threads used to flush tables. 0 will create a thread for each stream, up to parallelism_max. -1 will create a thread for each CPU core. Any other positive number will create that number of threads, up to parallelism_max.
parallelism_maxInteger(Default: 16) Max number of parallel threads to use when flushing tables.
primary_key_requiredBoolean(Default: True) Log based and Incremental replications on tables with no Primary Key cause duplicates when merging UPDATE events. When set to true, stop loading data if no Primary Key is defined.