Introduction¶
We have all been there. As your data project grows:
- Logic gets tightly coupled
- Repetitive tasks are duplicated
- Complexity increases
Ordeq tackles these problems by providing a simple yet powerful way to define IO and transformations throughout your project.
How Ordeq helps¶
Let's see how Ordeq helps you develop data pipelines. We will start with a simple example. Here is how a simple data pipeline looks like without Ordeq:
| __main__.py | |
|---|---|
1 2 3 4 5 6 7 8 | |
Ordeq works with any data processing tool
The example above uses PySpark, since it is a popular data processing tool, but the same principles apply to other tools. If you are not familiar with Spark, imagine the same logic implemented with Pandas, Dask, or another tool of your choice.
Suppose we want to add a bit more logic, such as filtering the data by a certain date:
| __main__.py | |
|---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
The code is getting more complex, and now you have to pass the date argument every time you run the script. Also, the logic is still tightly coupled, and you cannot easily reuse parts of it.
Improvements¶
Can we do better? Let's try to modularize the logic by splitting it into functions:
| __main__.py | |
|---|---|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | |
This is much better! Each piece of logic can be tested in isolation.
You can reuse the functions in other parts of your project.
However, there are still some challenges.
You still need to route spark, date, txs and clients through the functions.
This couples the IO with the transformations.
The Ordeq solution¶
Ordeq dissolves the coupling by separating IO, like Spark tables and command line argument, from the transformations.
While the transformations reside in pipeline.py, IOs are defined in a separate catalog module.
Lastly, a __main__.py module takes care of running the job:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
1 2 3 4 5 6 7 8 9 10 | |
1 2 3 4 5 6 7 8 9 10 | |
graph TB
subgraph legend["Legend"]
direction TB
subgraph Objects
L0(["Node"]):::node
L1[("IO")]:::io
end
subgraph IO Types
L00[("CommandLineArg")]:::io0
L01[("SparkHiveTable")]:::io1
end
end
IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3
subgraph pipeline["Pipeline"]
direction TB
join_txs_and_clients(["join_txs_and_clients"]):::node
IO0[("txs")]:::io1
IO1[("clients")]:::io1
IO2[("date")]:::io0
IO3[("txs_and_clients")]:::io1
end
classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
Visualising pipelines with Ordeq
This diagram is auto-generated by Ordeq. See Run and Viz for more information.
The idea behind the separation is that changes to the IO should not affect the transformations, and vice versa. Furthermore, the separation helps you keep your project organized.
Decoupling in practice¶
Changing IO¶
Say you want to read the transactions from a Iceberg table instead of a Hive table,
you only need to change catalog.py.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
1 2 3 4 5 6 7 8 9 10 | |
1 2 3 4 5 6 7 8 9 10 | |
graph TB
subgraph legend["Legend"]
direction TB
subgraph Objects
L0(["Node"]):::node
L1[("IO")]:::io
end
subgraph IO Types
L00[("CommandLineArg")]:::io0
L01[("SparkHiveTable")]:::io1
L02[("SparkIcebergTable")]:::io2
end
end
IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3
subgraph pipeline["Pipeline"]
direction TB
join_txs_and_clients(["join_txs_and_clients"]):::node
IO0[("txs")]:::io2
IO1[("clients")]:::io1
IO2[("date")]:::io0
IO3[("txs_and_clients")]:::io1
end
classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb
Or, maybe the date comes from an environment variable instead of a command line argument:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
1 2 3 4 5 6 7 8 9 10 | |
1 2 3 4 5 6 7 8 9 10 | |
graph TB
subgraph legend["Legend"]
direction TB
subgraph Objects
L0(["Node"]):::node
L1[("IO")]:::io
end
subgraph IO Types
L00[("EnvironmentVariable")]:::io0
L01[("SparkHiveTable")]:::io1
L02[("SparkIcebergTable")]:::io2
end
end
IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3
subgraph pipeline["Pipeline"]
direction TB
join_txs_and_clients(["join_txs_and_clients"]):::node
IO0[("txs")]:::io2
IO1[("clients")]:::io1
IO2[("date")]:::io0
IO3[("txs_and_clients")]:::io1
end
classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb
Perhaps you want to append data to the txs_and_clients table instead of overwriting it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | |
1 2 3 4 5 6 7 8 9 10 11 12 | |
1 2 3 4 5 6 7 8 9 10 | |
graph TB
subgraph legend["Legend"]
direction TB
subgraph Objects
L0(["Node"]):::node
L1[("IO")]:::io
end
subgraph IO Types
L00[("EnvironmentVariable")]:::io0
L01[("SparkHiveTable")]:::io1
L02[("SparkIcebergTable")]:::io2
end
end
IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3
subgraph pipeline["Pipeline"]
direction TB
join_txs_and_clients(["join_txs_and_clients"]):::node
IO0[("txs")]:::io2
IO1[("clients")]:::io1
IO2[("date")]:::io0
IO3[("txs_and_clients")]:::io1
end
classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb
All changes above require only amendments to catalog.py.
The transformations in pipeline.py remain unchanged.
Furthermore, each IO is defined once and can be reused throughout your project.
Changing transformations¶
Vice versa, if you want to change the logic of how transactions and clients are joined, you only need to change pipeline.py.
For example, you might want to filter out inactive clients and transactions with a non-positive amount:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | |
1 2 3 4 5 6 7 8 9 10 11 12 | |
1 2 3 4 5 6 7 8 9 10 | |
graph TB
subgraph legend["Legend"]
direction TB
subgraph Objects
L0(["Node"]):::node
L1[("IO")]:::io
end
subgraph IO Types
L00[("EnvironmentVariable")]:::io0
L01[("SparkHiveTable")]:::io1
L02[("SparkIcebergTable")]:::io2
end
end
IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3
subgraph pipeline["Pipeline"]
direction TB
join_txs_and_clients(["join_txs_and_clients"]):::node
IO0[("txs")]:::io2
IO1[("clients")]:::io1
IO2[("date")]:::io0
IO3[("txs_and_clients")]:::io1
end
classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb
The example above only changes the join_txs_and_clients node.
What if we want to add more nodes?
For example, we might want to add a node that aggregates the transaction amount by client:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | |
1 2 3 4 5 6 7 8 9 10 11 12 13 | |
graph TB
subgraph legend["Legend"]
direction TB
subgraph Objects
L0(["Node"]):::node
L1[("IO")]:::io
end
subgraph IO Types
L00[("EnvironmentVariable")]:::io0
L01[("SparkHiveTable")]:::io1
L02[("SparkIcebergTable")]:::io2
end
end
IO0 --> join_txs_and_clients
IO1 --> join_txs_and_clients
IO2 --> join_txs_and_clients
join_txs_and_clients --> IO3
IO3 --> aggregate_txs
aggregate_txs --> IO4
subgraph pipeline["Pipeline"]
direction TB
join_txs_and_clients(["join_txs_and_clients"]):::node
aggregate_txs(["aggregate_txs"]):::node
IO0[("txs")]:::io2
IO1[("clients")]:::io1
IO2[("date")]:::io0
IO3[("txs_and_clients")]:::io1
IO4[("aggregated_txs")]:::io1
end
classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb
This example shows how easy it is to grow and maintain your project with Ordeq. You can add new nodes and IO without changing existing code. Each transformation is modular and isolated.
Running the pipeline¶
Meanwhile, the run function takes care of loading the inputs and saving the outputs of each node.
You no longer need to route the inputs and outputs of each transformation through the functions.
Dependencies between nodes are automatically resolved.
Where to go from here?
- Learn more about Ordeqs core concepts