Upload Airflow DAG to DAGs folder (hosted on Google Cloud Composer)

One you have your DAG defined under a nice dag.py file, you need to transfer it from your local environment to the DAGs/ folder of your Airflow instance.

airflow-dag-repo
├── dags
    └── dag.py
├── airflow_dag_repo
    ├── __init__.py
    └── utils.py
├── poetry.lock
└── pyproject.toml

This can be done manually via the Command Line Interface, replacing the Gitlab $airflow_gcs_bucket_dev environment variable by the storage location your running Airflow instance relies on:

gsutil cp -r ./dags* gs://$airflow_gcs_bucket_dev/dags/airflow_dag_repo/

In our case, we consider our Airflow instance spinning on the Google Cloud Composer Service from Google Cloud Platform and using Cloud Storage as Storage Service. Thus the use of the gsutil that lets us access Cloud Storage (where our DAGs/ folder is located).

The second best option would be to integrate this command execution within your Gitlab CI/CD pipeline execution.

However, the most advanced and preferred option (like, the ultimate no-brainer) would be to use Terraform.

Run the extra mile

You want to reduce the workload that the Airflow Parser has to deal with. Thus, to improve the parsing time, send zip compressed files and remove unnecessary noises e.g. pruning away cache folders:

zip -r airflow_dag_repo.zip ./dags ./airflow_dag_repo
gsutil cp -r airflow_dag_repo.zip gs://$airflow_gcs_bucket_dev/dags/
locals {
    dags = fileset("${var.root_dir}/../../dags/", "**")
    code = toset([
        for file in fileset("${var.root_dir}/../../airflow_dag_repo/", "**"):
            file if length(regexall(".*__pycache__.*", file)) == 0
    ])
    upload_folder = replace(var.service_name, "-", "_")
}

resource "google_storage_bucket_object" "dags" {
    for_each = local.dags
    name   = "dags/${local.upload_folder}/${each.key}"
    source = "${var.root_dir}/../../dags/${each.key}"
    bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}

resource "google_storage_bucket_object" "code" {
    for_each = local.code
    name   = "dags/${local.upload_folder}/${each.key}"
    source = "${var.root_dir}/../../airflow_dag_repo/${each.key}"
    bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}

Note: the service_name is inferred by terragrunt

locals {
    git_remote_origin_url = run_cmd("--terragrunt-quiet", "git", "config", "--get", "remote.origin.url")
    service_name = run_cmd("--terragrunt-quiet", "basename", "-s", ".git", local.git_remote_origin_url)
}
airflow-dag-repo
├── dags
    └── dag.py
├── airflow_dag_repo
    ├── __init__.py
    └── utils.py
├── terraform
    ├── sync_dags.tf
    ├── tf_variables.tf
    └── versions.tf
├── terragrunt
    ├── dev
        ├── env.hcl
        ├── env.tfvars
        └── terragrunt.hcl
    ├── prod
        ├── env.hcl
        ├── env.tfvars
        └── terragrunt.hcl
    └── terragrunt.hcl
├── poetry.lock
└── pyproject.toml

Leave a Reply

Your email address will not be published. Required fields are marked *