One you have your DAG defined under a nice dag.py
file, you need to transfer it from your local environment to the DAGs/ folder of your Airflow instance.
airflow-dag-repo
├── dags
└── dag.py
├── airflow_dag_repo
├── __init__.py
└── utils.py
├── poetry.lock
└── pyproject.toml
This can be done manually via the Command Line Interface, replacing the Gitlab $airflow_gcs_bucket_dev
environment variable by the storage location your running Airflow instance relies on:
gsutil cp -r ./dags* gs://$airflow_gcs_bucket_dev/dags/airflow_dag_repo/
In our case, we consider our Airflow instance spinning on the Google Cloud Composer
Service from Google Cloud Platform and using Cloud Storage
as Storage Service. Thus the use of the gsutil that lets us access Cloud Storage
(where our DAGs/ folder is located).
The second best option would be to integrate this command execution within your Gitlab CI/CD pipeline execution.
However, the most advanced and preferred option (like, the ultimate no-brainer) would be to use Terraform.
Run the extra mile
You want to reduce the workload that the Airflow Parser has to deal with. Thus, to improve the parsing time, send zip compressed files and remove unnecessary noises e.g. pruning away cache folders:
zip -r airflow_dag_repo.zip ./dags ./airflow_dag_repo
gsutil cp -r airflow_dag_repo.zip gs://$airflow_gcs_bucket_dev/dags/
locals {
dags = fileset("${var.root_dir}/../../dags/", "**")
code = toset([
for file in fileset("${var.root_dir}/../../airflow_dag_repo/", "**"):
file if length(regexall(".*__pycache__.*", file)) == 0
])
upload_folder = replace(var.service_name, "-", "_")
}
resource "google_storage_bucket_object" "dags" {
for_each = local.dags
name = "dags/${local.upload_folder}/${each.key}"
source = "${var.root_dir}/../../dags/${each.key}"
bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}
resource "google_storage_bucket_object" "code" {
for_each = local.code
name = "dags/${local.upload_folder}/${each.key}"
source = "${var.root_dir}/../../airflow_dag_repo/${each.key}"
bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}
Note: the service_name
is inferred by terragrunt
locals {
git_remote_origin_url = run_cmd("--terragrunt-quiet", "git", "config", "--get", "remote.origin.url")
service_name = run_cmd("--terragrunt-quiet", "basename", "-s", ".git", local.git_remote_origin_url)
}
airflow-dag-repo
├── dags
└── dag.py
├── airflow_dag_repo
├── __init__.py
└── utils.py
├── terraform
├── sync_dags.tf
├── tf_variables.tf
└── versions.tf
├── terragrunt
├── dev
├── env.hcl
├── env.tfvars
└── terragrunt.hcl
├── prod
├── env.hcl
├── env.tfvars
└── terragrunt.hcl
└── terragrunt.hcl
├── poetry.lock
└── pyproject.toml