Python Itertools Cycle

The itertools.cycle() method is a nice way to iterate through an iterable – e.g. a list – indefinitely in a cycling way. When all the elements are exhausted, the elements are once again red from the beginning.

from itertools import cycle

duty_persons = ["Olivier", "David", "Goran",]
duty_persons_cycle = cycle(duty_persons)

for _ in range(6):
    print(next(duty_persons_cycle))

The above snippet will returned the following output:

Olivier
David
Goran
Olivier
David
Goran

As you can see in the above example, the cycle method is quite helpful in a couple of situations:

  • establishing a rolling list of duty-persons, rotating on a regular base;
  • can be used when scrapping the web to cycle between hosts to outrun anti-bot policies;
  • any use-case you might think of…

Upload Airflow DAG to DAGs folder (hosted on Google Cloud Composer)

One you have your DAG defined under a nice dag.py file, you need to transfer it from your local environment to the DAGs/ folder of your Airflow instance.

airflow-dag-repo
├── dags
    └── dag.py
├── airflow_dag_repo
    ├── __init__.py
    └── utils.py
├── poetry.lock
└── pyproject.toml

This can be done manually via the Command Line Interface, replacing the Gitlab $airflow_gcs_bucket_dev environment variable by the storage location your running Airflow instance relies on:

gsutil cp -r ./dags* gs://$airflow_gcs_bucket_dev/dags/airflow_dag_repo/

In our case, we consider our Airflow instance spinning on the Google Cloud Composer Service from Google Cloud Platform and using Cloud Storage as Storage Service. Thus the use of the gsutil that lets us access Cloud Storage (where our DAGs/ folder is located).

The second best option would be to integrate this command execution within your Gitlab CI/CD pipeline execution.

However, the most advanced and preferred option (like, the ultimate no-brainer) would be to use Terraform.

Run the extra mile

You want to reduce the workload that the Airflow Parser has to deal with. Thus, to improve the parsing time, send zip compressed files and remove unnecessary noises e.g. pruning away cache folders:

zip -r airflow_dag_repo.zip ./dags ./airflow_dag_repo
gsutil cp -r airflow_dag_repo.zip gs://$airflow_gcs_bucket_dev/dags/
locals {
    dags = fileset("${var.root_dir}/../../dags/", "**")
    code = toset([
        for file in fileset("${var.root_dir}/../../airflow_dag_repo/", "**"):
            file if length(regexall(".*__pycache__.*", file)) == 0
    ])
    upload_folder = replace(var.service_name, "-", "_")
}

resource "google_storage_bucket_object" "dags" {
    for_each = local.dags
    name   = "dags/${local.upload_folder}/${each.key}"
    source = "${var.root_dir}/../../dags/${each.key}"
    bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}

resource "google_storage_bucket_object" "code" {
    for_each = local.code
    name   = "dags/${local.upload_folder}/${each.key}"
    source = "${var.root_dir}/../../airflow_dag_repo/${each.key}"
    bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}

Note: the service_name is inferred by terragrunt

locals {
    git_remote_origin_url = run_cmd("--terragrunt-quiet", "git", "config", "--get", "remote.origin.url")
    service_name = run_cmd("--terragrunt-quiet", "basename", "-s", ".git", local.git_remote_origin_url)
}
airflow-dag-repo
├── dags
    └── dag.py
├── airflow_dag_repo
    ├── __init__.py
    └── utils.py
├── terraform
    ├── sync_dags.tf
    ├── tf_variables.tf
    └── versions.tf
├── terragrunt
    ├── dev
        ├── env.hcl
        ├── env.tfvars
        └── terragrunt.hcl
    ├── prod
        ├── env.hcl
        ├── env.tfvars
        └── terragrunt.hcl
    └── terragrunt.hcl
├── poetry.lock
└── pyproject.toml

Fix Airflow Pylance reportMissingImports

The Airflow Pylance reportMissingImports can be fixed by installing the apache-airflow python library within your virtual environment and then enter the path where Visual Studio Code (or any other IDE) can find it.

If you are using poetry as python package manager (more on poetry here) and Visual Studio Code (VSC):

  1. Run

    poetry add apache-airflow
    
  2. Find the path where the poetry virtual environment is located on your system:

    poetry env info --path
    
  3. On VSC, open the Command Palette, then >Python: Select Interpreter and enter the path returned in the above command.

After this is done, you will not have those error anymore:

Import "airflow" could not be resolved Pylance reportMissingImports

Note: this method remains valid for all report Missing Imports errors you might encounter.

Airflow DAG labels

On Airflow you can label dependencies between the tasks using the Label() object.

from airflow.utils.edgemodifier import Label

task_A >> Label("Transform") >> task_B

This become handy when you explicitly want to document the edges of your DAG (Direct Acyclic Graph) e.g. to indicate what is happening between the tasks.

Minimal Functional Example

from airflow import DAG
from airflow.decorators import task
from airflow.utils.edgemodifier import Label

from datetime import datetime

with DAG(
    dag_id="test-airflow-dag-label",
    start_date=datetime(2023, 1, 23),
    schedule_interval="@daily",
    catchup=False,
    tags=["test"],
) as dag:

    @task
    def task_A():
        ...

    @task
    def task_B():
        ...

    task_A() >> Label("Move data from A to B") >> task_B()

Note: the ... (ellipsis literal) is equivalent to pass.

Python uuid

uuid is a python module providing immutable UUID (Universally Unique IDentifier URN Namespace) objects and functions e.g. uuid4().

The generated UUID objects are unique since it generates IDs based on time and computer hardware.

python> from uuid import uuid4
python> uuid_object = uuid4()
python> uuid_object
UUID('ce8d1fee-bc31-406b-8690-94c01caabcb6')

python> str(uuid_object)
'ce8d1fee-bc31-406b-8690-94c01caabcb6'

Those can be used to generate random strings that can serve as unique identifier across a given namespace – e.g. if you want to generate temporary filenames on the fly located under a specific folder:

for _ in range(10):
    file = open(f"/tmp/{uuid4()}.txt", "a")
    file.write("hello world!")
    file.close()

Note: despite the above snippet working like a charm, it is better to use open() with the context manager with to make sure the close() function will always be called should an error occurs during the write operation.

Hazardous: uuid1() compromises privacy since it uses the network address of the computer to generate the unique ID. Thus, it could be reverse-engineered and retrieved. To prevent this, always choose one of the latest functions e.g. uuid4() or uuid5() as the previous ones rapidly get depreciated.

Note: to know more about URIs, URLs and URNs https://olivierbenard.fr/change-the-url-of-a-git-repository/.

Python extend vs. append

  • append() appends the given object at the end of the list. The length is only incremented of +1.
  • extend() extends the list by appending the elements it contains one by one into the list. The length is incremented by the number of elements.

In more details

The append() method is straight forward:

python> a = [1,2,3]
python> a.append(4)
python> a
[1, 2, 3, 4]

However, what happens if you need to add multiple elements at the same time? You could call the append() method several times, but would it work to give the list of new elements to be added into the original list directly as a parameter? Let’s give a try:

python> a = [1,2,3]
python> a.append([4,5])
python> a
[1, 2, 3, [5, 6]] # vs. expected [1, 2, 3, 4, 5, 6]

The extend() method intends to solve this problem:

python> b = [1,2,3]
python> b.append([4,5])
python> b
[1, 2, 3, 4, 5]

Note: the + operator is equivalent to an extend call.

python> a = [1,2,3]
python> a += [4,5]
python> a
[1, 2, 3, 4, 5]

Difference between datalake, datapool and datamart

Once the data is loaded on the data warehouse, it can be stored into different environments:

  • datalake: protected staging environment with limited access to Data Engineers only. It receives raw data from ingesting pipelines. Typically, tables there only contains few columns e.g. content (containing stringify json) and the ingested_at ISO 8601 timestamp.

  • datapool-pii: structured/prepared environment where data from datalake is parsed and technical transformations are applied on the data. Typically, the previous content field is parsed (e.g. using JSON_EXTRACT() DML alike-functions on BigQuery) and split into different fields. Accredited stakeholders can directly pick data from there. If sensitive information were on the data, they are also to be found on this environment.

  • datapool: same as for the datapool-pii environment but the sensitive information are removed.

  • datamarts: environments aggregating environments with integrated business objects and aggregations for metrics, intelligence, science and analytics. For stakeholders. You can see them as data business products. Generally, those tables are owned by an expert group, e.g. the Data Modellers and Business Intelligence team.

Note: PII stands for Personally Identifiable Information.

End-to-end Data Journey Example

Let’s examine an end-to-end business case to see how the different environments intervene into the general workflow.

Let’s say that you have the following json data containing some statistics per active users (e.g. on an app or website):

{
    "active_users": [
        {
            "id": "12e57e",
            "email": "john.doe@gmail.com",
            "results": [
                {
                    "stats": {"views": 12587, "spend": 8000}
                }
            ]
        },
        {
            "id": "r87e5z",
            "email": "jane.doe@gmail.com",
            "results": [
                {
                    "stats": {"views": 97553, "spend": 10000}
                }
            ]
        },
        {
            "id": "8tr75z",
            "email": "johnny.doe@gmail.com",
            "results": [
                {
                    "stats": {"views": 41239, "spend": 5000}
                }
            ]
        }
    ]
}

You ingested the above json data into the following raw table on datalake:

content ingest_at
{“active_users”:[{“id”:”12e57e”,”email”:”john.doe@gmail.com”,”results”:[{“stats”:{“views”:12587,”spend”:8000}}]},{“id”:”r87e5z”,”email”:”jane.doe@gmail.com”,”results”:[{“stats”:{“views”:97553,”spend”:10000}}]},{“id”:”8tr75z”,”email”:”johnny.doe@gmail.com”,”results”:[{“stats”:{“views”:41239,”spend”:5000}}]}]} 2022-12-14T04:00:00.000 UTC

Note: content being a STRING and ingested_at a TIMESTAMP.

The table is then parsed and stored into the datapool-pii environment:

id email views spend
12e57e john.doe@gmail.com 12587 8000
r87e5z jane.doe@gmail.com 97553 10000
8tr75z johnny.doe@gmail.com 41239 5000

This table contains sensitive information (e.g. pii data). Thus, before granting access to stakeholders, one must hide or hash the sensitive fields:

id email views spend
12e57e 375320dd9ae7ed408002f3768e16cb5f28c861062fd50dff9a3bff62e9dce4ef 12587 8000
r87e5z 831f6494ad6be4fcb3a724c3d5fef22d3ceffa3c62ef3a7984e45a0ea177f982 97553 10000
8tr75z 980b542e198802ebe4d57690a1ad3e587b5751cb537209112c0564b52bd0699f 41239 5000

The above table is stored on datapool.

Then, on datamart you will find this table aggregated with other metrics and measurements, ready to be used and consumed by the vertical teams.

To go even further

To parse the raw json content field, you can use an Airflow job that will regularly execute the following SQL query:

{% set datalake_project = var.value.datalake_project %}

WITH
  raw_lake_table AS (
  SELECT
    JSON_EXTRACT_ARRAY(content, '$.active_users') AS active_users,
    ingested_at as ingested_at
  FROM
    `{{datalake_project}}.dataset.table`
  WHERE DATE(ingested_at) = "{{ds}}"
  )
SELECT
  JSON_EXTRACT_SCALAR(unnested_active_users, '$.id') as id,
  JSON_EXTRACT_SCALAR(unnested_active_users, '$.email') as email,
  JSON_EXTRACT(results, '$.stats.views') as views,
  JSON_EXTRACT(results, '$.stats.spend') as spend,
  ingested_at
FROM
  raw_lake_table,
  UNNEST(active_users) as unnested_active_users
  LEFT JOIN UNNEST(JSON_EXTRACT_ARRAY(unnested_active_users, '$.results')) as results

Note: You can use Jinja templated variables to parametrize your query. In our case, {{datalake_project}} will be replaced at execution time by datalake-prod or datalake-dev. In the same manner, {{ds}} will be replaced by the current YYYY-MM-DD date. See Airflow Templates Reference for more information.

To hash using sha256 on python:

python3> import hashlib
python3> hashlib.sha256("your-string".encode('utf-8')).hexdigest()
'42c23acbf1b1c471c7b53efe58f34dea361d941f47584265df5dbaec1bfddd49'

On BigQuery, you can directly use the SHA256 method to encrypt your fields in your data-pipelines:

SELECT SHA256("your-string") as sha256;
+----------------------------------------------+
| sha256                                       |
+----------------------------------------------+
| QsI6y/GxxHHHtT7+WPNN6jYdlB9HWEJl31267Bv93Uk= |
+----------------------------------------------+

Caution: The return type is BYTES in base64 format. If you want to compare it with the returned above Python value, then used TO_HEX.

SELECT TO_HEX(SHA256("your-string")) as sha256;
+------------------------------------------------------------------+
| sha256                                                           |
+------------------------------------------------------------------+
| 42c23acbf1b1c471c7b53efe58f34dea361d941f47584265df5dbaec1bfddd49 |
+------------------------------------------------------------------+

Note: Even better, use SHA512.

Why stop using SFTP

You should stop using SFTP as it is an outdated technology that also comes with its share of disadvantages. The main problems with SFTP are the following:

  1. You have to keep the servers secure (network security, access control, …). It adds extra loads on top of your list of duties.

  2. SFTP servers only do a SFTP-to-bucket dump which is unnecessary if we would use buckets directly.

  3. As for Google Cloud Platform, SFTP servers mount GCP buckets as files systems to a Docker container. This is something that can be done but depending on the load sent to the SFTP can become a problem (volume limitations).

Note: SFTP can be used as an interface to pass on requests between the user and the bucket it mirrors. It uses user-friendly command lines, see SFTP basic CLI commands survival kit.

Alternative to SFTP

Beside providing a friendly interface to our customers (e.g. if they use FileZilla), why using SFTP is a no go? Preaching against it sounds like we are fighting against hell. Why Data Software Engineers so adamant about it? At the end, do we now why it is an issue? It is not the best practice but is it that hurtful?

To give you the short answer: there is nothing wrong with SFTP in general, it is just an outdated technology.

On the contrary, there are multiple benefits using buckets directly:

  1. Bucket security is done by GCP (of course, we have to do the access control – which is not so hard).

  2. The interface is also easy since you can see the bucket contents in the GCP console (no need for external tools like FileZilla).

  3. Buckets can handle a high amount of parallel reads.

It is not always possible though

As stated in the Fundamentals of Data Engineering book published by O’REILLY:

Data engineers rightfully cringe at the mention of SFTP. Regardless, SFTP is still a practical reality for many businesses. They work with partner businesses that consume or provide data using SFTP and are unwilling to rely on other standards.

To conclude, try to get ride of SFTP legacies but don’t get your knickers in a twist regarding this. 👟

How to create custom Airflow Operators

Disclaimer: This article is intended for users with already some hands-on experience with Airflow. If this is not the case, I am working on a Airflow Essentials Survival Kit guide to be released. The link will be posted here as soon as it is the case.

To create custom Airflow Operators, all you need is to import the Airflow BaseOperator and surcharge it with the parameters and logic you need. You just have to fill-in the template below:

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyCustomOperator(BaseOperator):

    # add the templated param(s) your custom operator takes:
    template_fields = ("your_custom_templated_field", )

    @apply_defaults
    def __init__(
        self,
        your_custom_field,
        your_custom_templated_field,
        *args,
        **kwargs
    ):
    super().__init__(*args, **kwargs)

    # assign the normal and templated params:
    self.your_custom_field = your_custom_field
    self.your_custom_templated_field = your_custom_templated_field

    def execute(self, context):
        # the logic to perform
        # ...

Note: The execute() method and context argument are mandatory.

Then, assuming you store this module in another file, you can call it inside your DAG file:

from path.to.file import MyCustomOperator

my_custom_job = MyCustomOperator(
    task_id="my_custom_operator",
    your_custom_templated_field=f'E.g. {"{{ds}}"}',
    your_custom_field=42,
)

Notes:

  • Because you have subscribed to the template_fields option, your custom_templated_field accepts Jinja Templated Variables like {{ds}}. You do not necessarily need to subscribe to this option though.
  • You can have more than one custom field.
  • Not all job’s parameters accept Jinja Templated values. Look up in the documentation which are the accepted templated ones. E.g. for BigQueryInsertJobOperator.

Example: Ingesting Data From API

Context: you have data sitting on an API. You want to fetch data from this API and ingest it into Google Cloud BigQuery. In-between, you are storing the fetched raw data as temporary json files inside a Google Cloud Storage Bucket. Next step is then to flush the files’ content within a BigQuery table.

In order to do so, you need to create a custom Airflow Operator that can use your API client to fetch the data; retrieving the credentials from an Airflow Connection – and stores the retrieved data into a temporary json file on Google Cloud Storage.

Your custom made Airflow Operator, stored in an api_ingest/core.py alike file will look like:

from typing import Any
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.base_hook import BaseHook
from custom_api_client.client import (
    CustomAPIClient,
)

class APIToGCSOperator(
    BaseOperator
): # pylint: disable=too-few-public-methods
    """
    Custom-made Airflow Operator fetching data from the API.
    """

    template_fields = (
        "destination_file",
        "ingested_at"
    )

    @apply_default
    def __init__(
        self,
        destination_file: str,
        ingested_at: str,
        entity: str,
        *args: Any,
        **kwargs: Any
    ) -> None:
        super().__init__(*args, **kwargs)

        self.destination_file = destination_file
        self.ingested_at = ingested_at
        self.entity = entity

    def execute(
        self,
        context: Any
    ) -> None # pylint: disable=unused-argument
        """
        Fetches data from the API and writes into GCS bucket.

        1. Reads the credentials stored into the Airflow Connection.
        2. Instantiates the API client.
        3. Fetches the data with the given parameters.
        4. Flushes the result into a temporary GCS bucket.
        """

        api_connection = BaseHook.get_connections("your_connection_name")
        credentials = api_connection.extra_dejson
        client_id = credentials["client_id"]
        client_secret = credentials["client_secret"]
        refresh_token = credentials["non_expiring_refresh_token"]

        custom_api_client = CustomAPIClient(
            client_id = client_id,
            client_secret = client_secret,
            refresh_token = refresh_token,
        )

        with open(
            self.destination_file,
            "w",
            encoding="utf-8"
        ) as output:

            fetched_data_json = custom_api_client.fetch_entity(
                entity=self.entity
            )

            entity_json = dict(
                content = json.dumps(
                    fetched_data_json,
                    default=str,
                    ensure_ascii=False
                    ),
                ingested_at = self.ingested_at,
            )

            json.dump(
                entity_json,
                output,
                default=str,
                ensure_ascii=False
            )
            output.write("\n")

Note: You can create your own custom-made API clients. To make sure yours is available inside your Airflow DAG, make sure to upload the package into the plugins Airflow folder beforehand.

In the main DAG’s module, inside the DAG’s with context manager, it will look like:

from airflow.operators.bash import BashOperator
from api_ingest.core import APIToGCSOperator

staging_dir = "{{var.value.local_storage_dir}}" + "/tempfiles/api_ingest"

create_staging_dir = BashOperator(
    task_id=f"create-staging-dir",
    bash_command=f"mkdir -p {staging_dir}"
)

cleanup_staging_dir = BashOperator(
    task_id=f"cleanup-staging-dir",
    bash_command=f"rm -rf {staging_dir}"
)

api_to_gcs = APIToGCSOperator(
    task_id = "api-to-gcs",
    destination_file = f'{staging_dir}/data_{"{{ds_nodash}}"}.mjson',
    ingested_at = "{{ts}}",
    entity = "campaigns",
)

Note: It is more efficient to use {{var.value.your_variable}} instead of Variable.get("your_variable"). Downside are: the real value is only gonna be replaced at execution time and only for the fields accepting Jinja Templated variables.

And here we go, you should now be able to create your own custom Airflow Operators! Have fun crafting AF; tuning it into your likings. 💫

Python Walrus Operator

The walrus operator := is a nice way to avoid repetitions of function calls and statements. It simplifies your code. You can compare the two following code snippets:

grades = [1, 12, 14, 17, 5, 8, 20]

stats = {
    'nb_grades': len(grades),
    'sum': sum(grades),
    'mean': sum(grades)/len(grades)
}
grades = [1, 12, 14, 17, 5, 8, 20]

stats = {
    'nb_grades': (nb_grades := len(grades)),
    'sum': (sum_grades := sum(grades)),
    'mean': sum_grades/nb_grades
}

Note: The parentheses are mandatory for the plain assignment to work.

Same goes for function calls:

foo = "hello world!"
if (n := len(foo)) > 4:
    print(f"foo is of length {n}")
foo is of length 12

In the above snippet, the len() method has only been called once instead of twice. More generally, you can assign values to variables on the fly without having to call the same methods more than once.

Important: The python walrus operator := (officially known as assignment expression operator) has been introduced by Python 3.8. This mean, once implemented, your code won’t be backward compatible anymore.

Note: The “walrus operator” affective appellation is due to its resemblance to the eyes and tusks of a walrus.

More examples

python> [name for _name in ["OLIVIER", "OLIVIA", "OLIVER"] if "vi" in (name := _name.lower())]
['olivier', 'olivia']

In this example, we are iterating through the list, storing each item of the list into the temporary _name variable. Then, we apply the lower() string method on the _name object, turning the upper case string into lower case. Next, we store the lower case value into the name variable using the walrus operator. Finally, we filter the values using the predicate to only keep the names containing “vi”.

The alternative without the walrus operator would have been:

python> [name.lower() for name in ["OLIVIER", "OLIVIA", "OLIVER"] if "vi" in name.lower()]
['olivier', 'olivia']

As you can see, this above code snipped is less “optimized” as you call the len() method twice on the same object.

You can also use the walrus operator without performing any filtering. The following code works like a charm:

python> [name for _name in ["OLIVIER", "OLIVIA", "OLIVER"] if (name := _name.lower())]
['olivier', 'olivia', 'oliver']

However, this is highly counter-intuitive and calls for errors. The presence of if let’s assume that there is a conditional check and filtering in place. Which is not the case. The codebase does not benefit from such design since the following snippet – on top of being clearer – is strictly equivalent in term of outcome:

python> [name.lower() for name in ["OLIVIER", "OLIVIA", "OLIVER"]]
['olivier', 'olivia', 'oliver']

Note: Developers tend to be very smart people. We sometime like to show off our smarts by demonstrate our mental juggling abilities. Resist the tentation of writing complex code. Programming is a social activity (e.g. all the collaborative open source projects). Be professional and keep the nerdy brilliant workarounds for your personal projects. Follow the KISS principle: Keep It Simple Stupid. Clarity is all that matters. You want to maximize the codebase discoverability.

Last but not least, you can also use the walrus operator inside while conditional statements:

while (answer := int(input())) != 42:
    print("This is not the Answer to the Ultimate Question of Life, the Universe, and Everything.")
python> python script.py
7
This is not the Answer to the Ultimate Question of Life, the Universe, and Everything.
3
This is not the Answer to the Ultimate Question of Life, the Universe, and Everything.
42

Note: In the above examples we have used list comprehension. This article (in progress) explains this design in more detail.

One more thing

You cannot do a plain assignment with the walrus operator. At least, it is not that easy:

python> a := 42
  File "<stdin>", line 1
    a := 42
      ^^
SyntaxError: invalid syntax

For the above code snippet to work, you need to enclose the assignment expression around parentheses:

python> a = 42
python> (a := 18)
18
python> print(a)
18

Who told you that Software Developers were not sentimental? ❤️