Test Airflow DAG locally

Installing the python libraries

Airflow DAGs can be tested and integrated within your unittest workflow.

For that, apache-airflow and pytest are all the Python pip libraries you need.

First, import the libraries and retrieve the current working directory:

from pathlib import Path
from airflow.models import DagBag
from unittest.mock import patch
import pytest

SCRIPT_DIRECTORY = Path(__file__).parent

Collecting the DAGs in the DagBag

Second, you want to collect all the local dags you have under your dags/ folder and want to test. You can use airflow.models.DagBag. You can create a dedicated dag_bag function for that task:

@pytest.fixture()
def dag_bag() -> DagBag:
    dag_folder = SCRIPT_DIRECTORY / ".." / "dags"
    dag_bag = DagBag(
        dag_folder=dag_folder,
        read_dags_from_db=False,
    )
    return dag_bag

This function will return a collection of dags, parsed out from the local dag folder tree you have specified.

Note: this above function is tailored for a project with a similar structure:

airflow-dag-repo
├── dags # all your dags go there
    └── dag.py
├── airflow_dag_repo
    ├── __init__.py
    └── commons.py
├── tests
    └── test_dag.py
├── poetry.lock
└── pyproject.toml

Optional: I use poetry as Python package manager, you can learn more about it too here.

Note: the fixture decorator is used as a setup tool to initialize reusable objects at one place and pass them to all your test functions as arguments. Here, the dag_bag object can now be accessed by all the test functions in that module.

Running the test suit on the collected DAGs

Finally, you can implement your tests:

def test_dag_tasks_count(dag_bag):
    dag = dag_bag.get_dag(dag_id="your-dag-id")
    assert dag.task_count == 4

def test_dags_import_errors(dag_bag):
    assert dag_bag.import_errors == {}

You can check the full example on Github: airflow-dag-unittests

Note: you can wrap-up your test functions within a Class using unittest.TestCase as I did on the codebase on github.com/olivierbenard/airflow-dag-unittests. However, doing so prevents you from using fixtures. A work-around exists, I will let you check what I did.

Mocking Airflow Variables

If you are using Airflow Variables in your DAGs e.g.:

from airflow.models import Variable
MY_VARIABLE = Variable.get("my-variable")

You need to add the following lines:

@patch.dict(
    "os.environ",
    AIRFLOW_VAR_YOUR_VARIABLE="", # mock your variable, prefixed with AIRFLOW_VAR.
)
@pytest.fixture()
def dag_bag() -> DagBag:
    ...

Otherwise, you will stumble across the following error during your local tests:

raise KeyError(f"Variable {key} does not exist")
KeyError: 'Variable <your-variable> does not exist'

To conclude, Airflow DAGs are always a headache to test and integrate within your unittest workflow. I hope this makes it easier.

Leave a Reply

Your email address will not be published. Required fields are marked *