Python positional-only parameters

The Python positional-only parameter has been introduced with the Python 3.8 release. It is a way to specify which parameters must be positional-only arguments – i.e. which parameters cannot be given as keyword-parameters. It uses the / parameter syntax. Elements positioned on the left side will be then turned into positional-only parameters.

def f(a, b, /, c): ...

The above method will only accepts calls of the following form:

python> f(1, 2, 3)
python> f(1, 2, c=3)

Note: c can be either given a position or keyword argument.

On the contrary, the following calls will raise a TypeError: f() got some positional-only arguments passed as keyword arguments error as a and b – being on the left of / – cannot be exposed as possible keyword-arguments:

python> f(1, b=2, 3)
python> f(1, b=2, c=3)
python> f(a=1, b=2, 3)

Use cases

The / positional-only parameter syntax is helpful in the following use-cases:

  • It precludes keyword arguments from being used when the parameter’s name is not necessary, confusing or misleading e.g. len(obj=my_list). As stated in the documentation, the obj keyword argument would here impairs readability.

  • It allows the name of the parameter to be changed in the future without introducing breaking changes in he client as they can still be passed through **kwargs to be used in two ways:

def transform(name, /, **kwargs):
    print(name.upper())

In the above snippet, you can access the name‘s value either using name or kwargs.get("name").

Last but not the least, it can also be used to prevent overwriting pre-set keyword-arguments in methods that still need to accept arbitrary keyword arguments (via kwargs):

def initialize(name="unknown", /, **kwargs):
    print(name, kwargs)

The name keyword-argument is protected and could not be overwritten, even if mistakenly captured a second time by the kwargs argument:

python> initialize(name="olivier")
unknown {'name': 'olivier'}

Notes:

  • Python does not allow positional arguments after keyword arguments because of the left-side/right-side of the / operator thingy.
  • Python does not allow positional arguments after keyword arguments because of the left-side/right-side of the * operator thingy.
  • *args are collecting as positional arguments.
  • **kwargs are collecting as keyword-arguments.

base64 encoding via CLI

Simply use base64 and base64 -d:

zsh> echo -n "username:password"|base64
dXNlcm5hbWU6cGFzc3dvcmQ=

Note: the -n option prevents echo to output a trailing newline (you do not want to encode \n).

zsh> echo dXNlcm5hbWU6cGFzc3dvcmQ=|base64 -d
username:password

base64 encoding is widely used in the web. It is mainly used to carry binary formatted data across channels that only support text content (e.g. HTTP requests or e-mail attachments). In the context of data-exchange through APIs, it is broadly used in the authentication process to include your credentials in the headers:

{
    "Authorization": "Basic <Base64Encoded(username:password)>"
}

Using the Curl Request command:

curl https://your-url
    -H "Authorization: Bearer {token}"

The token being most often your base64 encoded and colon separated credentials (or any other specified credentials).

Using Python

Base64 encoding/decoding with Python:

python> import base64
python> encoded = base64.b64encode(b"username:password")
python> print(encoded)
b'dXNlcm5hbWU6cGFzc3dvcmQ='

python> decoded = base64.b64decode(b'dXNlcm5hbWU6cGFzc3dvcmQ=')
python> print(decoded)
b'username:password'

Notes:

  • With the aforementioned python code, you will then have to decode the b”string” to string. This can be done using the .decode() method:
b'your-string'.decode()
  • If you are on VSC (Visual Studio Code) you can then encode/decode on the fly using an extension e.g. vscode-base64.

Python Pickle Serialization

pickle allows you to serialize and de-serialize Python objects to save them into a file for future use. You can then read this file and extract the stored Python objects, de-serializing them so they can be integrated back into the code’s logic.

You just need the two basic commands: pickle.dump(object, file) and pickle.load(file).

Below, a round trip example:

import pickle

FILENAME = "tmp.pickle"
original_list = ["banana", "apple", "pear"]

with open(FILENAME, "wb") as file:
    pickle.dump(original_list, file)
file.close()

with open(FILENAME, "rb") as file:
    retrieved_list = pickle.load(file)
file.close()

print(retrieved_list) # ["banana", "apple", "pear"]

Python Itertools Cycle

The itertools.cycle() method is a nice way to iterate through an iterable – e.g. a list – indefinitely in a cycling way. When all the elements are exhausted, the elements are once again red from the beginning.

from itertools import cycle

duty_persons = ["Olivier", "David", "Goran",]
duty_persons_cycle = cycle(duty_persons)

for _ in range(6):
    print(next(duty_persons_cycle))

The above snippet will returned the following output:

Olivier
David
Goran
Olivier
David
Goran

As you can see in the above example, the cycle method is quite helpful in a couple of situations:

  • establishing a rolling list of duty-persons, rotating on a regular base;
  • can be used when scrapping the web to cycle between hosts to outrun anti-bot policies;
  • any use-case you might think of…

Upload Airflow DAG to DAGs folder (hosted on Google Cloud Composer)

One you have your DAG defined under a nice dag.py file, you need to transfer it from your local environment to the DAGs/ folder of your Airflow instance.

airflow-dag-repo
├── dags
    └── dag.py
├── airflow_dag_repo
    ├── __init__.py
    └── utils.py
├── poetry.lock
└── pyproject.toml

This can be done manually via the Command Line Interface, replacing the Gitlab $airflow_gcs_bucket_dev environment variable by the storage location your running Airflow instance relies on:

gsutil cp -r ./dags* gs://$airflow_gcs_bucket_dev/dags/airflow_dag_repo/

In our case, we consider our Airflow instance spinning on the Google Cloud Composer Service from Google Cloud Platform and using Cloud Storage as Storage Service. Thus the use of the gsutil that lets us access Cloud Storage (where our DAGs/ folder is located).

The second best option would be to integrate this command execution within your Gitlab CI/CD pipeline execution.

However, the most advanced and preferred option (like, the ultimate no-brainer) would be to use Terraform.

Run the extra mile

You want to reduce the workload that the Airflow Parser has to deal with. Thus, to improve the parsing time, send zip compressed files and remove unnecessary noises e.g. pruning away cache folders:

zip -r airflow_dag_repo.zip ./dags ./airflow_dag_repo
gsutil cp -r airflow_dag_repo.zip gs://$airflow_gcs_bucket_dev/dags/
locals {
    dags = fileset("${var.root_dir}/../../dags/", "**")
    code = toset([
        for file in fileset("${var.root_dir}/../../airflow_dag_repo/", "**"):
            file if length(regexall(".*__pycache__.*", file)) == 0
    ])
    upload_folder = replace(var.service_name, "-", "_")
}

resource "google_storage_bucket_object" "dags" {
    for_each = local.dags
    name   = "dags/${local.upload_folder}/${each.key}"
    source = "${var.root_dir}/../../dags/${each.key}"
    bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}

resource "google_storage_bucket_object" "code" {
    for_each = local.code
    name   = "dags/${local.upload_folder}/${each.key}"
    source = "${var.root_dir}/../../airflow_dag_repo/${each.key}"
    bucket = data.google_secret_manager_secret_version.airflow_bucket.secret_data
}

Note: the service_name is inferred by terragrunt

locals {
    git_remote_origin_url = run_cmd("--terragrunt-quiet", "git", "config", "--get", "remote.origin.url")
    service_name = run_cmd("--terragrunt-quiet", "basename", "-s", ".git", local.git_remote_origin_url)
}
airflow-dag-repo
├── dags
    └── dag.py
├── airflow_dag_repo
    ├── __init__.py
    └── utils.py
├── terraform
    ├── sync_dags.tf
    ├── tf_variables.tf
    └── versions.tf
├── terragrunt
    ├── dev
        ├── env.hcl
        ├── env.tfvars
        └── terragrunt.hcl
    ├── prod
        ├── env.hcl
        ├── env.tfvars
        └── terragrunt.hcl
    └── terragrunt.hcl
├── poetry.lock
└── pyproject.toml

Fix Airflow Pylance reportMissingImports

The Airflow Pylance reportMissingImports can be fixed by installing the apache-airflow python library within your virtual environment and then enter the path where Visual Studio Code (or any other IDE) can find it.

If you are using poetry as python package manager (more on poetry here) and Visual Studio Code (VSC):

  1. Run

    poetry add apache-airflow
    
  2. Find the path where the poetry virtual environment is located on your system:

    poetry env info --path
    
  3. On VSC, open the Command Palette, then >Python: Select Interpreter and enter the path returned in the above command.

After this is done, you will not have those error anymore:

Import "airflow" could not be resolved Pylance reportMissingImports

Note: this method remains valid for all report Missing Imports errors you might encounter.

Airflow DAG labels

On Airflow you can label dependencies between the tasks using the Label() object.

from airflow.utils.edgemodifier import Label

task_A >> Label("Transform") >> task_B

This become handy when you explicitly want to document the edges of your DAG (Direct Acyclic Graph) e.g. to indicate what is happening between the tasks.

Minimal Functional Example

from airflow import DAG
from airflow.decorators import task
from airflow.utils.edgemodifier import Label

from datetime import datetime

with DAG(
    dag_id="test-airflow-dag-label",
    start_date=datetime(2023, 1, 23),
    schedule_interval="@daily",
    catchup=False,
    tags=["test"],
) as dag:

    @task
    def task_A():
        ...

    @task
    def task_B():
        ...

    task_A() >> Label("Move data from A to B") >> task_B()

Note: the ... (ellipsis literal) is equivalent to pass.

Python uuid

uuid is a python module providing immutable UUID (Universally Unique IDentifier URN Namespace) objects and functions e.g. uuid4().

The generated UUID objects are unique since it generates IDs based on time and computer hardware.

python> from uuid import uuid4
python> uuid_object = uuid4()
python> uuid_object
UUID('ce8d1fee-bc31-406b-8690-94c01caabcb6')

python> str(uuid_object)
'ce8d1fee-bc31-406b-8690-94c01caabcb6'

Those can be used to generate random strings that can serve as unique identifier across a given namespace – e.g. if you want to generate temporary filenames on the fly located under a specific folder:

for _ in range(10):
    file = open(f"/tmp/{uuid4()}.txt", "a")
    file.write("hello world!")
    file.close()

Note: despite the above snippet working like a charm, it is better to use open() with the context manager with to make sure the close() function will always be called should an error occurs during the write operation.

Hazardous: uuid1() compromises privacy since it uses the network address of the computer to generate the unique ID. Thus, it could be reverse-engineered and retrieved. To prevent this, always choose one of the latest functions e.g. uuid4() or uuid5() as the previous ones rapidly get depreciated.

Note: to know more about URIs, URLs and URNs https://olivierbenard.fr/change-the-url-of-a-git-repository/.

Python extend vs. append

  • append() appends the given object at the end of the list. The length is only incremented of +1.
  • extend() extends the list by appending the elements it contains one by one into the list. The length is incremented by the number of elements.

In more details

The append() method is straight forward:

python> a = [1,2,3]
python> a.append(4)
python> a
[1, 2, 3, 4]

However, what happens if you need to add multiple elements at the same time? You could call the append() method several times, but would it work to give the list of new elements to be added into the original list directly as a parameter? Let’s give a try:

python> a = [1,2,3]
python> a.append([4,5])
python> a
[1, 2, 3, [5, 6]] # vs. expected [1, 2, 3, 4, 5, 6]

The extend() method intends to solve this problem:

python> b = [1,2,3]
python> b.append([4,5])
python> b
[1, 2, 3, 4, 5]

Note: the + operator is equivalent to an extend call.

python> a = [1,2,3]
python> a += [4,5]
python> a
[1, 2, 3, 4, 5]

Difference between datalake, datapool and datamart

Once the data is loaded on the data warehouse, it can be stored into different environments:

  • datalake: protected staging environment with limited access to Data Engineers only. It receives raw data from ingesting pipelines. Typically, tables there only contains few columns e.g. content (containing stringify json) and the ingested_at ISO 8601 timestamp.

  • datapool-pii: structured/prepared environment where data from datalake is parsed and technical transformations are applied on the data. Typically, the previous content field is parsed (e.g. using JSON_EXTRACT() DML alike-functions on BigQuery) and split into different fields. Accredited stakeholders can directly pick data from there. If sensitive information were on the data, they are also to be found on this environment.

  • datapool: same as for the datapool-pii environment but the sensitive information are removed.

  • datamarts: environments aggregating environments with integrated business objects and aggregations for metrics, intelligence, science and analytics. For stakeholders. You can see them as data business products. Generally, those tables are owned by an expert group, e.g. the Data Modellers and Business Intelligence team.

Note: PII stands for Personally Identifiable Information.

End-to-end Data Journey Example

Let’s examine an end-to-end business case to see how the different environments intervene into the general workflow.

Let’s say that you have the following json data containing some statistics per active users (e.g. on an app or website):

{
    "active_users": [
        {
            "id": "12e57e",
            "email": "john.doe@gmail.com",
            "results": [
                {
                    "stats": {"views": 12587, "spend": 8000}
                }
            ]
        },
        {
            "id": "r87e5z",
            "email": "jane.doe@gmail.com",
            "results": [
                {
                    "stats": {"views": 97553, "spend": 10000}
                }
            ]
        },
        {
            "id": "8tr75z",
            "email": "johnny.doe@gmail.com",
            "results": [
                {
                    "stats": {"views": 41239, "spend": 5000}
                }
            ]
        }
    ]
}

You ingested the above json data into the following raw table on datalake:

content ingest_at
{“active_users”:[{“id”:”12e57e”,”email”:”john.doe@gmail.com”,”results”:[{“stats”:{“views”:12587,”spend”:8000}}]},{“id”:”r87e5z”,”email”:”jane.doe@gmail.com”,”results”:[{“stats”:{“views”:97553,”spend”:10000}}]},{“id”:”8tr75z”,”email”:”johnny.doe@gmail.com”,”results”:[{“stats”:{“views”:41239,”spend”:5000}}]}]} 2022-12-14T04:00:00.000 UTC

Note: content being a STRING and ingested_at a TIMESTAMP.

The table is then parsed and stored into the datapool-pii environment:

id email views spend
12e57e john.doe@gmail.com 12587 8000
r87e5z jane.doe@gmail.com 97553 10000
8tr75z johnny.doe@gmail.com 41239 5000

This table contains sensitive information (e.g. pii data). Thus, before granting access to stakeholders, one must hide or hash the sensitive fields:

id email views spend
12e57e 375320dd9ae7ed408002f3768e16cb5f28c861062fd50dff9a3bff62e9dce4ef 12587 8000
r87e5z 831f6494ad6be4fcb3a724c3d5fef22d3ceffa3c62ef3a7984e45a0ea177f982 97553 10000
8tr75z 980b542e198802ebe4d57690a1ad3e587b5751cb537209112c0564b52bd0699f 41239 5000

The above table is stored on datapool.

Then, on datamart you will find this table aggregated with other metrics and measurements, ready to be used and consumed by the vertical teams.

To go even further

To parse the raw json content field, you can use an Airflow job that will regularly execute the following SQL query:

{% set datalake_project = var.value.datalake_project %}

WITH
  raw_lake_table AS (
  SELECT
    JSON_EXTRACT_ARRAY(content, '$.active_users') AS active_users,
    ingested_at as ingested_at
  FROM
    `{{datalake_project}}.dataset.table`
  WHERE DATE(ingested_at) = "{{ds}}"
  )
SELECT
  JSON_EXTRACT_SCALAR(unnested_active_users, '$.id') as id,
  JSON_EXTRACT_SCALAR(unnested_active_users, '$.email') as email,
  JSON_EXTRACT(results, '$.stats.views') as views,
  JSON_EXTRACT(results, '$.stats.spend') as spend,
  ingested_at
FROM
  raw_lake_table,
  UNNEST(active_users) as unnested_active_users
  LEFT JOIN UNNEST(JSON_EXTRACT_ARRAY(unnested_active_users, '$.results')) as results

Note: You can use Jinja templated variables to parametrize your query. In our case, {{datalake_project}} will be replaced at execution time by datalake-prod or datalake-dev. In the same manner, {{ds}} will be replaced by the current YYYY-MM-DD date. See Airflow Templates Reference for more information.

To hash using sha256 on python:

python3> import hashlib
python3> hashlib.sha256("your-string".encode('utf-8')).hexdigest()
'42c23acbf1b1c471c7b53efe58f34dea361d941f47584265df5dbaec1bfddd49'

On BigQuery, you can directly use the SHA256 method to encrypt your fields in your data-pipelines:

SELECT SHA256("your-string") as sha256;
+----------------------------------------------+
| sha256                                       |
+----------------------------------------------+
| QsI6y/GxxHHHtT7+WPNN6jYdlB9HWEJl31267Bv93Uk= |
+----------------------------------------------+

Caution: The return type is BYTES in base64 format. If you want to compare it with the returned above Python value, then used TO_HEX.

SELECT TO_HEX(SHA256("your-string")) as sha256;
+------------------------------------------------------------------+
| sha256                                                           |
+------------------------------------------------------------------+
| 42c23acbf1b1c471c7b53efe58f34dea361d941f47584265df5dbaec1bfddd49 |
+------------------------------------------------------------------+

Note: Even better, use SHA512.