How to create custom Airflow Operators

Disclaimer: This article is intended for users with already some hands-on experience with Airflow. If this is not the case, I am working on a Airflow Essentials Survival Kit guide to be released. The link will be posted here as soon as it is the case.

To create custom Airflow Operators, all you need is to import the Airflow BaseOperator and surcharge it with the parameters and logic you need. You just have to fill-in the template below:

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):

    # add the templated param(s) your custom operator takes:
    template_fields = ("your_custom_templated_field", )

    @apply_defaults
    def __init__(
        self,
        your_custom_field,
        your_custom_templated_field,
        *args,
        **kwargs
    ):
    super().__init__(*args, **kwargs)

    # assign the normal and templated params:
    self.your_custom_field = your_custom_field
    self.your_custom_templated_field = your_custom_templated_field

    def execute(self, context):
        # the logic to perform
        # ...

Note: The execute() method and context argument are mandatory.

Then, assuming you store this module in another file, you can call it inside your DAG file:

from path.to.file import MyCustomOperator

my_custom_job = MyCustomOperator(
    task_id="my_custom_operator",
    your_custom_templated_field=f'E.g. {"{{ds}}"}',
    your_custom_field=42,
)

Notes:

  • Because you have subscribed to the template_fields option, your custom_templated_field accepts Jinja Templated Variables like {{ds}}. You do not necessarily need to subscribe to this option though.
  • You can have more than one custom field.
  • Not all job’s parameters accept Jinja Templated values. Look up in the documentation which are the accepted templated ones. E.g. for BigQueryInsertJobOperator.

Example: Ingesting Data From API

Context: you have data sitting on an API. You want to fetch data from this API and ingest it into Google Cloud BigQuery. In-between, you are storing the fetched raw data as temporary json files inside a Google Cloud Storage Bucket. Next step is then to flush the files’ content within a BigQuery table.

In order to do so, you need to create a custom Airflow Operator that can use your API client to fetch the data; retrieving the credentials from an Airflow Connection – and stores the retrieved data into a temporary json file on Google Cloud Storage.

Your custom made Airflow Operator, stored in an api_ingest/core.py alike file will look like:

from typing import Any
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.base_hook import BaseHook
from custom_api_client.client import (
    CustomAPIClient,
)

class APIToGCSOperator(
    BaseOperator
): # pylint: disable=too-few-public-methods
    """
    Custom-made Airflow Operator fetching data from the API.
    """

    template_fields = (
        "destination_file",
        "ingested_at"
    )

    @apply_default
    def __init__(
        self,
        destination_file: str,
        ingested_at: str,
        entity: str,
        *args: Any,
        **kwargs: Any
    ) -> None:
        super().__init__(*args, **kwargs)

        self.destination_file = destination_file
        self.ingested_at = ingested_at
        self.entity = entity

    def execute(
        self,
        context: Any
    ) -> None # pylint: disable=unused-argument
        """
        Fetches data from the API and writes into GCS bucket.

        1. Reads the credentials stored into the Airflow Connection.
        2. Instantiates the API client.
        3. Fetches the data with the given parameters.
        4. Flushes the result into a temporary GCS bucket.
        """

        api_connection = BaseHook.get_connections("your_connection_name")
        credentials = api_connection.extra_dejson
        client_id = credentials["client_id"]
        client_secret = credentials["client_secret"]
        refresh_token = credentials["non_expiring_refresh_token"]

        custom_api_client = CustomAPIClient(
            client_id = client_id,
            client_secret = client_secret,
            refresh_token = refresh_token,
        )

        with open(
            self.destination_file,
            "w",
            encoding="utf-8"
        ) as output:

            fetched_data_json = custom_api_client.fetch_entity(
                entity=self.entity
            )

            entity_json = dict(
                content = json.dumps(
                    fetched_data_json,
                    default=str,
                    ensure_ascii=False
                    ),
                ingested_at = self.ingested_at,
            )

            json.dump(
                entity_json,
                output,
                default=str,
                ensure_ascii=False
            )
            output.write("\n")

Note: You can create your own custom-made API clients. To make sure yours is available inside your Airflow DAG, make sure to upload the package into the plugins Airflow folder beforehand.

In the main DAG’s module, inside the DAG’s with context manager, it will look like:

from airflow.operators.bash import BashOperator
from api_ingest.core import APIToGCSOperator

staging_dir = "{{var.value.local_storage_dir}}" + "/tempfiles/api_ingest"

create_staging_dir = BashOperator(
    task_id=f"create-staging-dir",
    bash_command=f"mkdir -p {staging_dir}"
)

cleanup_staging_dir = BashOperator(
    task_id=f"cleanup-staging-dir",
    bash_command=f"rm -rf {staging_dir}"
)

api_to_gcs = APIToGCSOperator(
    task_id = "api-to-gcs",
    destination_file = f'{staging_dir}/data_{"{{ds_nodash}}"}.mjson',
    ingested_at = "{{ts}}",
    entity = "campaigns",
)

Note: It is more efficient to use {{var.value.your_variable}} instead of Variable.get("your_variable"). Downside are: the real value is only gonna be replaced at execution time and only for the fields accepting Jinja Templated variables.

And here we go, you should now be able to create your own custom Airflow Operators! Have fun crafting AF; tuning it into your likings. 💫

Leave a Reply

Your email address will not be published. Required fields are marked *