Background

A target processes and writes data to an API or Database. It includes the logic to authenticate, handle errors and output the summary of the processed records.

Initial Setup

Install the cookiecutter Python package via pip

pip install cookiecutter

Download the target hotglue sdk by running this command

cookiecutter git@gitlab.com:hotglue/target-hotglue-sdk.git --directory="cookiecutter/target-template"

Fill destination_name with the target name (capitalized), the author name, select “Per record” as the serialization method, and choose the authorization method (OAuth2 or Bearer token).

Enter the the newly created folder and install the package locally in editable mode, so your changes are reflected immediately for testing

pip install -e .

Target structure

The folders and file structure of the target should look like this:

.secrets/ # It contains all the input (config.json, data.singer) and output(target-state) files
target_example/
    auth.py # logic to authenticate to be able to send requests to the API
    client.py # base sink attributes and methods
    sinks.py # custom sinks for each endpoint
    target.py # target class and main file.

target.py

The target class is the entry point of the target execution. Target name, available sinks and config fields should be defined here.

Define the config.json schema

In target.py define all required values in the config file, such as username, password, api_key, client_id, etc

class TargetExample(TargetHotglue):
    config_jsonschema = th.PropertiesList(
        th.Property("username", th.StringType, required=True),
        th.Property("password", th.StringType, required=True),
        th.Property(
            "url_base",
            th.StringType,
        ),
    ).to_dict()

List the available sinks

Import all sinks (Check below to see how to create a sink)

class TargetExample(TargetHotglue):
    SINK_TYPES = [VendorsSink, ItemsSink, PurchaseOrdersSink]
    config_jsonschema = th.PropertiesList(
        th.Property("username", th.StringType, required=True),
        th.Property("password", th.StringType, required=True),
        th.Property(
            "url_base",
            th.StringType,
        ),
    ).to_dict()

client.py

client.py contains all the common functions and properties, such as the api base url and the authentication logic within a base sink. You can overwrite any of the prebuilt functions here and they will be applied to all sinks

The base sink should be a child class from either of these Hotglue sinks:

  • For Per record serialization method it should inherit from HotglueSink
  • For Per batch serialization method it should inherit from HotglueBatchSink

Common Attributes

  • base_url: The base url of the API
  • http_headers: Any header that should be sent when making the request.
  • MAX_SIZE_DEFAULT: When making a target with per batch serialization methos, this attribute defines the amount of records that will be processed and passed into the make_batch_request function.

Authentication

The authentication should be defined in the base sink. If the logic is simple could be added here as well and in cases like OAuth is a good practice to create an auth.py file containing the logic.

These are some of the common authentication methods used and how they can be added to the base sink.

Basic Authentication

The basic auth logic can be added as a property or function in the base sink and passed in the headers.

class ExampleBaseSink(HotglueSink):

    base_url = "https://www.my_example_api.com" 

    @property
    def authenticator(self):
        user = self.config.get("username")
        passwd = self.config.get("password")
        token = b64encode(f"{user}:{passwd}".encode()).decode()
        return f"Basic {token}"

    @property
    def http_headers(self): # any headers that should be sent
        return {
            "Authorization": self.authenticator
        }

API Key authentication

The Api Key can be passed directly in the headers.

class ExampleBaseSink(HotglueSink):

    base_url = "https://www.my_example_api.com" 

    @property
    def http_headers(self): # any headers that should be sent
        return {
            "Api-Key": self.config.get("api_key") # assuming the api_key is present in the config file
        }

OAuth Authentication

In the case of OAuth an Authentication class can be created in a separate file. And then imported into the base sink.

class ExampleSink(HotglueSink):

    @property
    def base_url(self) -> str:
        base_url = f"<https://api.url.com/rest/v1/>"
        return base_url

    @property
    def authenticator(self):
        # basic authentication example
        user = self.config.get("username")
        passwd = self.config.get("password")
        token = b64encode(f"{user}:{passwd}".encode()).decode()
        return f"Basic {token}"

auth.py

This class will be responsible to generate an access token to make requests to the API. The auth class should be called in the base sink class.

Notes

  • In case of OAuth with refresh token the target will only handle the logic to refresh the token, the authorization process should be done before and the refresh_token should be already in the config file.
  • In hotglue, the authorization process is handled out of the box.

Main properties

  • auth_headers: Inside we can validate if the token is valid, call the function to request a new token and structure the access token. It should return a dict with the Authorization key and the access_token to be sent in the requests made to the API.

  • oauth_request_body: It builds the payload to be sent when making the request to obtain a new access token.

Main functions

  • is_token_valid: It validates if the token is still valid or not to be used. It should return a boolean.

  • update_access_token: It makes a request to generate a new access token.
    In cases of rotating refresh tokens or if we need to save the new access token in the config file, we need to add an init statement in the Auth class, Target class and the Base Sink class.

Sample auth.py

class ExampleAuthenticator:
    """API Authenticator for OAuth 2.0 flows."""

    def __init__(
        self,
        target,
        auth_endpoint: Optional[str] = None,
    ) -> None:
        """Init authenticator.

        Args:
            stream: A stream for a RESTful endpoint.
        """
        self.target_name: str = target.name
        self._config: Dict[str, Any] = target._config
        self._auth_headers: Dict[str, Any] = {}
        self._auth_params: Dict[str, Any] = {}
        self.logger: logging.Logger = target.logger
        self._auth_endpoint = auth_endpoint
        self._config_file = target.config_file
        self._target = target

    @property
    def auth_headers(self) -> dict:
        if not self.is_token_valid(): # validate token
            self.update_access_token() #update token if not valid
        result = {}
        result["Authorization"] = f"Bearer {self._config.get('access_token')}"
        return result

    @property
    def oauth_request_body(self) -> dict:
        """Define the OAuth request body"""
        return {
            "refresh_token": self._config["refresh_token"],
            "grant_type": "refresh_token",
            "client_id": self._config["client_id"],
            "client_secret": self._config["client_secret"],
        }

    def is_token_valid(self) -> bool:
        access_token = self._config.get("access_token")
        now = round(datetime.utcnow().timestamp())
        expires_in = self._config.get("expires_in")
        if  expires_in is not None:
            expires_in = int(expires_in)
        if not access_token:
            return False
        if not expires_in:
            return False
        return not ((expires_in - now) < 120)

    def update_access_token(self) -> None:
        token_response = requests.post(
            self._auth_endpoint, data=self.oauth_request_body, headers={}
        )
        token_json = token_response.json()
        self.access_token = token_json["access_token"]
        self._config["access_token"] = token_json["access_token"]
        self._config["refresh_token"] = token_json["refresh_token"]
        self._config["expires_in"] = int(token_json["expires_in"])

        with open(self._target.config_file, "w") as outfile:
            json.dump(self._config, outfile, indent=4)

sinks.py

Create a sink per each endpoint in sinks.py

Main functions

Most of the target logic is defined in target-hotglue sdk, in most cases only 2 functions logic need to be defined.

  • preprocess_record: This functions reads each record from the data.singer and builds the payload that will be sent to the API. Here is where data can be mapped or customized logic to build the payload can be added.
    It should return the payload to be sent to the API.

  • upsert_record: It receives the payload returned by preprocess_record function and it sends it to the API.
    It should return id, status, state.

    • id: the API provided id of the created or updated record
    • status: a boolean that tells us if the record was sent succesfully,
    • state: data to be written in the target-state.json commonly an empty dict.

If the record was updated instead of created we can add it to the state:

state["is_updated"] = True

Note: any exception or error in the process_record function won’t stop the target, the error will be written in the target-state.json file together with the externalId provided in the data.singer, and the target will continue and read the next record.

Samples

class ContactsSink(ExampleSink):

    endpoint = "/contacts"
    name = "Contacts" # should have the same name as the record that will be ingested

You can add the method preprocess_record to the ContactsSink class, this will take the input data and transform it into the payload we pass to the endpoint

class ContactsSink(ExampleSink):

endpoint = "/contacs"
name = "Contacts"

def preprocess_record(self, record: dict, context: dict) -> None:
    payload = {
        "Name": record.get("name"),
        "LastName": record.get("last_name"),
        "Currency": record.get("currency"),
    }

    return payload

Add the method upsert_record to the ContactsSink class, here the payload created from preprocess_record will be sent to the api

def upsert_record(self, record: dict, context: dict):
    state_updates = dict() # initialize the state to save in the state status, and the id of the new created contact

    # basic request structure
    if record:
        vendor = self.request_api(
            "POST", endpoint=self.endpoint, request_data=record
        )
        vendor_id = vendor.json()["id"]
        return vendor_id, True, state_updates # required, or the request will be considered as failed

target-state.json

The target state stores the bookmarks which is an array of all the outputs that come from upsert_record. It holds the id of the newly created or updated records, or if an error ocurred it contains the reason of the error and the externalId data to recognize which record has failed.

It stores the summary which is a count of 4 types of records:

  • success: Count of created and updated records, if updated is not defined in the upsert_record function.
  • failed: Count of records that failed to be created or updated.
  • existing: Count of any dupplicated records that were read, the target uses the externalId to determine this.
  • updated: Count ofupdated records, this counts only works if defined in upsert_record.

Example:

{
    "bookmarks": {
        "Bills": [
            {
                "hash": "994e64ce06a996df7c24fc70543969a33352efe2114805e49ad01ca42e6862c5",
                "success": true,
                "id": "B-7890"
            }
        ]
    },
    "summary": {
        "Bills": {
            "success": 1,
            "fail": 0,
            "existing": 0,
            "updated": 0
        }
    }
}

Testing the target locally

Input Files

config.json

This file stores all the credentials needed to send data to the API. Flags or additional data can be saved here for customized logic

Example:


{
    "username": "user_12345&",
    "password": "asdzxc098",
    "required_header_value": "header_required"
}

data.singer

All Singer SDK targets accept a jsonl (JSON lines) input, we typically call it data.singer which should be Singer formatted records.

Note: Each record should have an externalId field, this will serve as a primary key to update the target-state if errors happen.

Example:

{"type": "SCHEMA", "stream": "Bills", "schema": {"type": ["object", "null"], "properties": {"vendorId": {"type": ["string", "null"]}, "totalAmount": {"type": ["number", "null"]}, "currency": {"type": ["string", "null"]}, "externalId": {"type": ["string", "null"]}}}, "key_properties": []}
{"type": "RECORD", "stream": "Bills", "record": {"vendorId": "V-1234", "totalAmount": 23.56, "currency": "USD", "externalId": "12345"}}
{"type": "STATE", "value": {}}

Running the target

Now to run the target you can do this:

cat data.singer | target-salesforce --config config.json

You can also set this up a debug config in vscode in the launch.json by setting a definition that looks like this:

{
    "name": "target-salesforce",
    "type": "python",
    "request": "launch",
    "justMyCode": false,
    "program": "../target_salesforce/target.py",
    "cwd": "targets/target-salesforce/.secrets",
    "args": [
        "<",
        "data.singer",
        "--config",
        "config.json"
    ],
    "console": "integratedTerminal",
    "python": ".env/target-salesforce/bin/python"
}