Skip to content

Airflow

This guide demonstrates how to orchestrate an Ordeq pipeline using Apache Airflow. Airflow is a tool to programmatically author, schedule, and monitor workflows. Ordeq is complementary to Airflow: while Airflow focuses on orchestration, Ordeq specializes in building modular and maintainable pipelines.

Familiarize yourself with running Ordeq projects

To get the most out of this guide, we recommend to familiarize yourself with running Ordeq projects. For instance, have a look at the running guide if you haven't done so already.

The code examples presented in this section can be found here.

Example: air quality pipeline

We will use the air_quality pipeline as an example to demonstrate how to orchestrate an Ordeq pipeline using Airflow. This pipeline ingests air quality data from an external API, processes it, and generates insights. The example is adapted from the Airflow docs.

Let's inspect the project first:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import duckdb
import pandas as pd
from ordeq import node

from air_quality import catalog


@node(inputs=catalog.air_quality_json, outputs=catalog.air_quality_data)
def ingest(air_quality: dict) -> duckdb.DuckDBPyRelation:
    data = air_quality.get("current")
    df = pd.DataFrame(data, index=[0])
    df["date"] = df["time"].astype("datetime64[ns]")
    return duckdb.from_df(df)


@node(inputs=catalog.air_quality_data, outputs=catalog.air_quality_insights)
def aggregate(air_quality: duckdb.DuckDBPyRelation) -> duckdb.DuckDBPyRelation:
    return air_quality.aggregate(
        "DATE(time) AS date"
        "AVG(pm2_5) AS avg_pm2_5"
        "AVG(european_aqi) AS avg_european_aqi"
        "AVG(us_aqi) AS avg_us_aqi"
    ).select("date, avg_pm2_5, avg_european_aqi, avg_us_aqi")
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from ordeq_duckdb import DuckDBParquet
from ordeq_requests import ResponseJSON

from air_quality.paths import DATA_DIRECTORY

air_quality_json = ResponseJSON(
    url="https://air-quality-api.open-meteo.com/v1/air-quality"
).with_load_options(
    params={
        "latitude": 28.6139,
        "longitude": 77.2090,
        "current": ",".join({
            "pm10": "float64",
            "pm2_5": "float64",
            "carbon_monoxide": "float64",
            "nitrogen_dioxide": "float64",
            "sulphur_dioxide": "float64",
            "ozone": "float64",
            "european_aqi": "float64",
            "us_aqi": "float64",
        }),
        "timezone": "UTC",
    }
)

air_quality_data = (
    DuckDBParquet(path=str(DATA_DIRECTORY / "air_quality"))
).with_save_options(partition_by=["date"], overwrite=True)

air_quality_insights = DuckDBParquet(
    path=str(DATA_DIRECTORY / "air_quality_insights.parquet")
)
1
2
3
4
5
6
from ordeq import run

from air_quality import pipeline

if __name__ == "__main__":
    run(pipeline)
graph TB
    air_quality.catalog:air_quality_json --> air_quality.pipeline:ingest
    air_quality.pipeline:ingest --> air_quality.catalog:air_quality_data
    air_quality.catalog:air_quality_data --> air_quality.pipeline:aggregate
    air_quality.pipeline:aggregate --> air_quality.catalog:air_quality_insights

    air_quality.pipeline:ingest@{shape: rounded, label: "ingest"}
    air_quality.pipeline:aggregate@{shape: rounded, label: "aggregate"}
    air_quality.catalog:air_quality_data@{shape: rect, label: "air_quality_data"}
    air_quality.catalog:air_quality_insights@{shape: rect, label: "air_quality_insights"}
    air_quality.catalog:air_quality_json@{shape: rect, label: "air_quality_json"}

    class air_quality.pipeline:ingest,air_quality.pipeline:aggregate node
    class air_quality.catalog:air_quality_data,air_quality.catalog:air_quality_insights io0
    class air_quality.catalog:air_quality_json io1
    classDef node fill:#008AD7,color:#FFF
    classDef io fill:#FFD43B
    classDef io0 fill:#66c2a5
    classDef io1 fill:#fc8d62
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
  'latitude': 28.599998,
  'longitude': 77.20001,
  'generationtime_ms': 0.27811527252197266,
  'utc_offset_seconds': 0,
  'timezone': 'GMT',
  'timezone_abbreviation': 'GMT',
  'elevation': 214.0,
  'current_units': {
    'time': 'iso8601',
    'interval': 'seconds',
    'pm10': 'μg/m³',
    'pm2_5': 'μg/m³',
    'carbon_monoxide': 'μg/m³',
    'nitrogen_dioxide': 'μg/m³',
    'sulphur_dioxide': 'μg/m³',
    'ozone': 'μg/m³',
    'european_aqi': 'EAQI',
    'us_aqi': 'USAQI'
  },
  'current': {
    'time': '2025-12-09T17:00',
    'interval': 3600,
    'pm10': 69.8,
    'pm2_5': 68.9,
    'carbon_monoxide': 1048.0,
    'nitrogen_dioxide': 36.4,
    'sulphur_dioxide': 30.5,
    'ozone': 63.0,
    'european_aqi': 85,
    'us_aqi': 150
  }
}

The DAG consists of two nodes: ingest and aggregate. The ingest node fetches air quality data from the API and stores it into a Parquet file. The last tab shows an example response from this API. The aggregate node computes statistics from the ingested data.

Orchestrating with Airflow

This section will discuss three approaches to orchestrate the air_quality pipeline using Airflow. If you want to follow along, first make sure to set up the Airflow environment using the instructions in the README.

Airflow syntax

The code in this guide has been created for Airflow 3, and uses the TaskFlow API. The patterns and ideas apply to older Airflow versions as well, but the syntax may differ. Please refer to the Airflow docs for more information.

Running individual nodes

The first approach to orchestrate an Ordeq pipeline with Airflow is to run each Ordeq node as an individual Airflow task. Each task uses the PythonOperator to execute the corresponding Ordeq node. The Airflow DAG for the air quality pipeline the looks as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import pendulum
from air_quality import pipeline
from airflow.sdk import dag, task
from ordeq import run


@dag(
    dag_id="dag_nodes",
    schedule="@daily",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
)
def dag_nodes():
    @task(task_id="task_run_ingest")
    def task_run_ingest(**kwargs):
        return run(pipeline.ingest)

    @task(task_id="task_run_aggregate")
    def task_run_aggregate(**kwargs):
        return run(pipeline.aggregate)

    ingest = task_run_ingest()
    aggregate = task_run_aggregate()

    ingest >> aggregate


_ = dag_nodes()

Each task in the DAG corresponds to one Ordeq node. The dependencies between the nodes are set using the >> operator. This is needed because the PythonOperator does not automatically infer dependencies between tasks based on the Ordeq pipeline structure.

This approach is suitable when you want to have fine-grained control over the execution of each node and monitor their status individually. Running node-by-node is less efficient than running the entire pipeline in a single task, as it incurs the overhead of task scheduling and context switching for each node. This approach is also not suitable if a task produces outputs that are not persisted, as the subsequent task cannot access the in-memory outputs of the previous task.

Running an entire pipeline

Instead of running each Ordeq node as an individual Airflow task, you can also run the entire Ordeq pipeline within a single Airflow task. This approach uses the PythonOperator to execute the run function on the entire pipeline. The Airflow DAG looks as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import air_quality
import pendulum
from airflow.sdk import dag, task
from ordeq import run


@dag(
    dag_id="dag_pipeline",
    schedule="@daily",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
)
def dag_pipeline():
    @task
    def task_run_pipeline(**kwargs):
        return run(air_quality)

    return task_run_pipeline()


_ = dag_pipeline()

This approach is more efficient than running node-by-node, but it sacrifices the fine-grained control and monitoring of individual nodes. This approach most suitable when the pipeline is relatively small, and you want to keep the DAG simple.

Running with Docker operator

The above approaches use the PythonOperator to run Ordeq nodes or pipelines within Airflow tasks. This requires the Airflow environment to have all pipeline dependencies installed. An alternative approach is to build a Docker image and run the container using the DockerOperator.

Check out the Docker integration guide

If you are new to using Docker with Ordeq, check out the Docker integration guide for more details on how to set up and use Docker with Ordeq.

The Airflow DAG can be defined as follows:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import pendulum
from airflow.sdk import dag, task


@dag(
    dag_id="dag_docker",
    schedule="@daily",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
)
def dag_docker():
    @task.docker(task_id="run_docker", image="air_quality:latest")
    def run_docker(**kwargs):
        pass


_ = dag_docker()

The DockerOperator takes the image and runs the Ordeq project inside the container. The command parameter specifies the command to run inside the container, which in this case is to run the air_quality pipeline. For more details on how to configure the Docker operator, please refer to the Airflow documentation.

To run the DAG with Docker operator locally, you have to first build the image:

docker build -t air_quality_insights:latest .

Next, you can launch the Airflow environment and trigger the DAG from the UI.

Questions or feedback on this guide?

If you have any questions or feedback regarding this guide, feel free to open an issue on GitHub.