Skip to content

Introduction

We have all been there. As your data project grows:

  • Logic gets tightly coupled
  • Repetitive tasks are duplicated
  • Complexity increases

Ordeq tackles these problems by providing a simple yet powerful way to define IO and transformations throughout your project.

How Ordeq helps

Let's see how Ordeq helps you develop data pipelines. We will start with a simple example. Here is how a simple data pipeline looks like without Ordeq:

__main__.py
from pyspark.sql import SparkSession

if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    txs = spark.table("txs")
    clients = spark.table("clients")
    txs_and_clients = txs.join(clients, on="client_id", how="left")
    txs_and_clients.write.mode("overwrite").saveAsTable("txs_and_clients")

Ordeq works with any data processing tool

The example above uses PySpark, since it is a popular data processing tool, but the same principles apply to other tools. If you are not familiar with Spark, imagine the same logic implemented with Pandas, Dask, or another tool of your choice.

Suppose we want to add a bit more logic, such as filtering the data by a certain date:

__main__.py
from pyspark.sql import SparkSession
from argparse import ArgumentParser

if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    parser = ArgumentParser()
    parser.add_argument("--date", type=str)
    date = parser.parse_args().date
    txs = spark.table("txs")
    txs = txs.filter(txs.date == date)
    clients = spark.table("clients")
    txs_and_clients = txs.join(clients, on="client_id", how="left")
    txs_and_clients.write.mode("overwrite").saveAsTable("txs_and_clients")

The code is getting more complex, and now you have to pass the date argument every time you run the script. Also, the logic is still tightly coupled, and you cannot easily reuse parts of it.

Improvements

Can we do better? Let's try to modularize the logic by splitting it into functions:

__main__.py
from pyspark.sql import DataFrame, SparkSession
from argparse import ArgumentParser


def load_table(spark: SparkSession, table: str) -> DataFrame:
    return spark.table(table)


def save_table(df: DataFrame, table: str):
    df.write.mode("overwrite").saveAsTable(table)


def parse_date() -> str:
    parser = ArgumentParser()
    parser.add_argument("--date", type=str)
    return parser.parse_args().date


def join_txs_and_clients(txs: DataFrame, clients: DataFrame, date: str):
    txs = txs.filter(txs.date == date)
    return txs.join(clients, on="client_id", how="left")


if __name__ == "__main__":
    spark = SparkSession.builder.getOrCreate()
    date = parse_date()
    txs = load_table(spark, "txs")
    clients = load_table(spark, "clients")
    txs_and_clients = join_txs_and_clients(txs, clients, date)
    save_table(txs_and_clients, "txs_and_clients")

This is much better! Each piece of logic can be tested in isolation. You can reuse the functions in other parts of your project. However, there are still some challenges. You still need to route spark, date, txs and clients through the functions. This couples the IO with the transformations.

The Ordeq solution

Ordeq dissolves the coupling by separating IO, like Spark tables and command line argument, from the transformations. While the transformations reside in pipeline.py, IOs are defined in a separate catalog module. Lastly, a __main__.py module takes care of running the job:

import catalog
from ordeq import node
from pyspark.sql import DataFrame


@node(
    inputs=[catalog.txs, catalog.clients, catalog.date],
    outputs=catalog.txs_and_clients,
)
def join_txs_and_clients(
    txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
    txs = txs.filter(txs.date == date)
    return txs.join(clients, on="client_id", how="left")
from ordeq_args import CommandLineArg
from ordeq_spark import SparkHiveTable

# Input IOs
date = CommandLineArg("--date", type=str)
txs = SparkHiveTable(table="txs")
clients = SparkHiveTable(table="clients")

# Output IOs
txs_and_clients = SparkHiveTable(table="txs_and_clients")
import logging

from ordeq import run
from pipeline import join_txs_and_clients

# Enable logging
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    run(join_txs_and_clients)
graph TB
    subgraph legend["Legend"]
    direction TB
    subgraph Objects
        L0(["Node"]):::node
        L1[("IO")]:::io
    end
    subgraph IO Types
        L00[("CommandLineArg")]:::io0
        L01[("SparkHiveTable")]:::io1
    end
end

IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3

subgraph pipeline["Pipeline"]
    direction TB
    join_txs_and_clients(["join_txs_and_clients"]):::node
    IO0[("txs")]:::io1
    IO1[("clients")]:::io1
    IO2[("date")]:::io0
    IO3[("txs_and_clients")]:::io1
end

classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62

Visualising pipelines with Ordeq

This diagram is auto-generated by Ordeq. See Run and Viz for more information.

The idea behind the separation is that changes to the IO should not affect the transformations, and vice versa. Furthermore, the separation helps you keep your project organized.

Decoupling in practice

Changing IO

Say you want to read the transactions from a Iceberg table instead of a Hive table, you only need to change catalog.py.

import catalog
from ordeq import node
from pyspark.sql import DataFrame


@node(
    inputs=[catalog.txs, catalog.clients, catalog.date],
    outputs=catalog.txs_and_clients,
)
def join_txs_and_clients(
    txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
    txs = txs.filter(txs.date == date)
    return txs.join(clients, on="client_id", how="left")
from ordeq_args import CommandLineArg
from ordeq_spark import SparkHiveTable

# Input IOs
date = CommandLineArg("--date", type=str)
txs = SparkIcebergTable(table="txs")
clients = SparkHiveTable(table="clients")

# Output IOs
txs_and_clients = SparkHiveTable(table="txs_and_clients")
import logging

from ordeq import run
from pipeline import join_txs_and_clients

# Enable logging
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    run(join_txs_and_clients)
graph TB
subgraph legend["Legend"]
    direction TB
    subgraph Objects
        L0(["Node"]):::node
        L1[("IO")]:::io
    end
    subgraph IO Types
        L00[("CommandLineArg")]:::io0
        L01[("SparkHiveTable")]:::io1
        L02[("SparkIcebergTable")]:::io2
    end
end

IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3

subgraph pipeline["Pipeline"]
    direction TB
    join_txs_and_clients(["join_txs_and_clients"]):::node
    IO0[("txs")]:::io2
    IO1[("clients")]:::io1
    IO2[("date")]:::io0
    IO3[("txs_and_clients")]:::io1
end

classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb

Or, maybe the date comes from an environment variable instead of a command line argument:

import catalog
from ordeq import node
from pyspark.sql import DataFrame


@node(
    inputs=[catalog.txs, catalog.clients, catalog.date],
    outputs=catalog.txs_and_clients,
)
def join_txs_and_clients(
    txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
    txs = txs.filter(txs.date == date)
    return txs.join(clients, on="client_id", how="left")
from ordeq_args import EnvironmentVariable
from ordeq_spark import SparkIcebergTable, SparkHiveTable

# Input IOs
date = EnvironmentVariable("DATE")
txs = SparkIcebergTable(table="txs")
clients = SparkHiveTable(table="clients")

# Output IOs
txs_and_clients = SparkHiveTable(table="txs_and_clients")
import logging

from ordeq import run
from pipeline import join_txs_and_clients

# Enable logging
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    run(join_txs_and_clients)
graph TB
subgraph legend["Legend"]
    direction TB
    subgraph Objects
        L0(["Node"]):::node
        L1[("IO")]:::io
    end
    subgraph IO Types
        L00[("EnvironmentVariable")]:::io0
        L01[("SparkHiveTable")]:::io1
        L02[("SparkIcebergTable")]:::io2
    end
end

IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3

subgraph pipeline["Pipeline"]
    direction TB
    join_txs_and_clients(["join_txs_and_clients"]):::node
    IO0[("txs")]:::io2
    IO1[("clients")]:::io1
    IO2[("date")]:::io0
    IO3[("txs_and_clients")]:::io1
end

classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb

Perhaps you want to append data to the txs_and_clients table instead of overwriting it:

import catalog
from ordeq import node
from pyspark.sql import DataFrame


@node(
    inputs=[catalog.txs, catalog.clients, catalog.date],
    outputs=catalog.txs_and_clients,
)
def join_txs_and_clients(
    txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
    txs = txs.filter(txs.date == date)
    return txs.join(clients, on="client_id", how="left")
from ordeq_args import EnvironmentVariable
from ordeq_spark import SparkIcebergTable, SparkHiveTable

# Input IOs
date = EnvironmentVariable("DATE")
txs = SparkIcebergTable(table="txs")
clients = SparkHiveTable(table="clients")

# Output IOs
txs_and_clients = SparkHiveTable(table="txs_and_clients").with_save_options(
    mode="append"
)
import logging

from ordeq import run
from pipeline import join_txs_and_clients

# Enable logging
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    run(join_txs_and_clients)
graph TB
subgraph legend["Legend"]
    direction TB
    subgraph Objects
        L0(["Node"]):::node
        L1[("IO")]:::io
    end
    subgraph IO Types
        L00[("EnvironmentVariable")]:::io0
        L01[("SparkHiveTable")]:::io1
        L02[("SparkIcebergTable")]:::io2
    end
end

IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3

subgraph pipeline["Pipeline"]
    direction TB
    join_txs_and_clients(["join_txs_and_clients"]):::node
    IO0[("txs")]:::io2
    IO1[("clients")]:::io1
    IO2[("date")]:::io0
    IO3[("txs_and_clients")]:::io1
end

classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb

All changes above require only amendments to catalog.py. The transformations in pipeline.py remain unchanged. Furthermore, each IO is defined once and can be reused throughout your project.

Changing transformations

Vice versa, if you want to change the logic of how transactions and clients are joined, you only need to change pipeline.py. For example, you might want to filter out inactive clients and transactions with a non-positive amount:

from ordeq import node
import catalog


@node(
    inputs=[catalog.txs, catalog.clients, catalog.date],
    outputs=catalog.txs_and_clients,
)
def join_txs_and_clients(
    txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
    txs = txs.filter(txs.date == date)
    txs_and_clients = txs.join(clients, on="client_id", how="inner")
    return txs_and_clients.where(
        (txs_and_clients.amount > 0) & (txs_and_clients.status == "active")
    )
from ordeq_args import EnvironmentVariable
from ordeq_spark import SparkHiveTable, SparkIcebergTable

# Input IOs
date = EnvironmentVariable("DATE")
txs = SparkIcebergTable(table="txs")
clients = SparkHiveTable(table="clients")

# Output IOs
txs_and_clients = SparkHiveTable(table="txs_and_clients").with_save_options(
    mode="append"
)
import logging

from ordeq import run
from pipeline import join_txs_and_clients

# Enable logging
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    run(join_txs_and_clients)
graph TB
subgraph legend["Legend"]
    direction TB
    subgraph Objects
        L0(["Node"]):::node
        L1[("IO")]:::io
    end
    subgraph IO Types
        L00[("EnvironmentVariable")]:::io0
        L01[("SparkHiveTable")]:::io1
        L02[("SparkIcebergTable")]:::io2
    end
end

IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3

subgraph pipeline["Pipeline"]
    direction TB
    join_txs_and_clients(["join_txs_and_clients"]):::node
    IO0[("txs")]:::io2
    IO1[("clients")]:::io1
    IO2[("date")]:::io0
    IO3[("txs_and_clients")]:::io1
end

classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb

The example above only changes the join_txs_and_clients node. What if we want to add more nodes? For example, we might want to add a node that aggregates the transaction amount by client:

from ordeq import node
import catalog


@node(
    inputs=[catalog.txs, catalog.clients, catalog.date],
    outputs=catalog.txs_and_clients,
)
def join_txs_and_clients(
    txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
    txs = txs.filter(txs.date == date)
    txs_and_clients = txs.join(clients, on="client_id", how="inner")
    return txs_and_clients.where(
        (txs_and_clients.amount > 0) & (txs_and_clients.status == "active")
    )


@node(inputs=catalog.txs_and_clients, outputs=catalog.aggregated_txs)
def aggregate_txs(txs_and_clients: DataFrame) -> DataFrame:
    return txs_and_clients.groupBy("client_id").sum("amount")
from ordeq_args import EnvironmentVariable
from ordeq_spark import SparkIcebergTable, SparkHiveTable

# Input IOs
date = EnvironmentVariable("DATE")
txs = SparkIcebergTable(table="txs")
clients = SparkHiveTable(table="clients")

# Intermediate IOs
txs_and_clients = SparkHiveTable(table="txs_and_clients").with_save_options(
    mode="append"
)

# Output IOs
aggregated_txs = SparkHiveTable(table="aggregated_txs").with_save_options(
    partition_by="date"
)
import logging

from ordeq import run
import pipeline

# Enable logging
logging.basicConfig(level=logging.INFO)

if __name__ == "__main__":
    run(pipeline)
    # This is equivalent to running:
    # from pipeline import join_txs_and_clients, aggregate_txs
    # >>> run(join_txs_and_clients, aggregate_txs)
graph TB
subgraph legend["Legend"]
    direction TB
    subgraph Objects
        L0(["Node"]):::node
        L1[("IO")]:::io
    end
    subgraph IO Types
        L00[("EnvironmentVariable")]:::io0
        L01[("SparkHiveTable")]:::io1
        L02[("SparkIcebergTable")]:::io2
    end
end

IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3
IO3 --> aggregate_txs
aggregate_txs --> IO4

subgraph pipeline["Pipeline"]
    direction TB
    join_txs_and_clients(["join_txs_and_clients"]):::node
    aggregate_txs(["aggregate_txs"]):::node
    IO0[("txs")]:::io2
    IO1[("clients")]:::io1
    IO2[("date")]:::io0
    IO3[("txs_and_clients")]:::io1
    IO4[("aggregated_txs")]:::io1
end

classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb

This example shows how easy it is to grow and maintain your project with Ordeq. You can add new nodes and IO without changing existing code. Each transformation is modular and isolated.

Running the pipeline

Meanwhile, the run function takes care of loading the inputs and saving the outputs of each node. You no longer need to route the inputs and outputs of each transformation through the functions. Dependencies between nodes are automatically resolved.

Where to go from here?