Installing the python libraries
Airflow DAGs can be tested and integrated within your unittest workflow.
For that, apache-airflow
and pytest
are all the Python pip libraries you need.
First, import the libraries and retrieve the current working directory:
from pathlib import Path
from airflow.models import DagBag
from unittest.mock import patch
import pytest
SCRIPT_DIRECTORY = Path(__file__).parent
Collecting the DAGs in the DagBag
Second, you want to collect all the local dags you have under your dags/ folder and want to test. You can use airflow.models.DagBag. You can create a dedicated dag_bag
function for that task:
@pytest.fixture()
def dag_bag() -> DagBag:
dag_folder = SCRIPT_DIRECTORY / ".." / "dags"
dag_bag = DagBag(
dag_folder=dag_folder,
read_dags_from_db=False,
)
return dag_bag
This function will return a collection of dags, parsed out from the local dag folder tree you have specified.
Note: this above function is tailored for a project with a similar structure:
airflow-dag-repo
├── dags # all your dags go there
└── dag.py
├── airflow_dag_repo
├── __init__.py
└── commons.py
├── tests
└── test_dag.py
├── poetry.lock
└── pyproject.toml
Optional: I use poetry
as Python package manager, you can learn more about it too here.
Note: the fixture decorator is used as a setup tool to initialize reusable objects at one place and pass them to all your test functions as arguments. Here, the dag_bag
object can now be accessed by all the test functions in that module.
Running the test suit on the collected DAGs
Finally, you can implement your tests:
def test_dag_tasks_count(dag_bag):
dag = dag_bag.get_dag(dag_id="your-dag-id")
assert dag.task_count == 4
def test_dags_import_errors(dag_bag):
assert dag_bag.import_errors == {}
You can check the full example on Github: airflow-dag-unittests
Note: you can wrap-up your test functions within a Class using unittest.TestCase
as I did on the codebase on github.com/olivierbenard/airflow-dag-unittests. However, doing so prevents you from using fixtures. A work-around exists, I will let you check what I did.
Mocking Airflow Variables
If you are using Airflow Variables in your DAGs e.g.:
from airflow.models import Variable
MY_VARIABLE = Variable.get("my-variable")
You need to add the following lines:
@patch.dict(
"os.environ",
AIRFLOW_VAR_YOUR_VARIABLE="", # mock your variable, prefixed with AIRFLOW_VAR.
)
@pytest.fixture()
def dag_bag() -> DagBag:
...
Otherwise, you will stumble across the following error during your local tests:
raise KeyError(f"Variable {key} does not exist")
KeyError: 'Variable <your-variable> does not exist'
To conclude, Airflow DAGs are always a headache to test and integrate within your unittest workflow. I hope this makes it easier.