Search documentation...

K

Airflow Operator

Overview

Hightouch provides an Airflow Operator for triggering a sync via Airflow. Our git repo contains the latest code as well as example dags to help you get started.

Setup

Create an API Token

The first step is to create an API Token in your workspace settings. Once you click the Create API Key button, you'll be prompted for a name and presented with an API key. You'll need to save this key and enter it in the Airflow connections window as shown in the next step.

Your API key provides read/write access to sensitive Hightouch resources and should be kept secure. If you believe your API key may have been compromised, be sure to revoke it immediately.

Add the Airflow connection

In Airflow, create a new connection by going to Admin > Connections and clicking the + to add a new connection. Name the connection hightouch_default and set the host to https://api.hightouch.com.

Enter the token from Step 1 in the Password field

Find the Sync ID

Within the Hightouch Sync page, click the sync you wish to trigger, then click on Schedule. The Sync ID will be displayed, you will use this in your Operator.

Install the Airflow package

While the details will vary for your particular Airflow installation, the simplest way to install our package is through pip

pip install airflow-provider-hightouch

Add the HightouchTriggerSyncOperator to your DAG

The last step is to add the operator to a DAG and add the Sync ID from Step 3. When the task is run, Airflow will send a call to the Hightouch API to trigger a run which will complete asynchronously.

Alternatively, you may choose to send a synchronous request and await the results of the sync before completing the task. For a full description of the operator parameters, view the source code on our Github page.

from airflow_provider_hightouch.operators.hightouch \
    import HightouchTriggerSyncOperator

with DAG(....) as dag:
...
    # This task will submit the request but not wait for completion of the
    # sync before completing.
    my_async_task = HightouchTriggerSyncOperator(
        task_id="run_my_sync", sync_id=1234
        )

    # This task will wait for the sync status to complete or error before
    # completing. Warnings can be treated as errors or successes depending
    # on your use case.
    my_sync_task = HightouchTriggerSyncOperator(
        task_id="run_sync", sync_id=123, synchronous=True, error_on_warning=True
    )

    Need help?

    Our team is relentlessly focused on your success. We're ready to jump on a call to help unblock you.

    • Connection issues with your data warehouse?
    • Confusing API responses from destination systems?
    • Unsupported destination objects or modes?
    • Help with complex SQL queries?

    Feature Requests?

    If you see something that's missing from our app, let us know and we'll work with you to build it!

    We want to hear your suggestions for new sources, destinations, and other features that would help you activate your data.

On this page

OverviewSetupCreate an API TokenAdd the Airflow connectionFind the Sync IDInstall the Airflow packageAdd the HightouchTriggerSyncOperator to your DAG

Was this page helpful?