Nodes
Nodes are functions representing a piece of data pipeline logic.
Typically, a node takes some data as input, transforms it, and returns the transformed data as output.
Nodes can be created by decorating a function with the @node
decorator:
from ordeq import node
from typing import Iterable
@node
def greet(names: Iterable[str]) -> None:
"""Prints a greeting for each person."""
for name in names:
print(f"Hello, {name}!")
Like functions, each node can have no, one, or more than one input(s), and return no, one, or more than one output(s). In data pipelines, the inputs and outputs usually represent data that is processed by the pipeline.
Specifying inputs and outputs
Suppose we want to greet a list of people whose names are provided in a CSV.
Let's use the CSV IO discussed in the IO section.
First we define the CSV IO in catalog.py
. Next, we modify the node in nodes.py
:
from ordeq import node
import catalog
@node(inputs=catalog.names)
def greet(names: tuple[str, ...]):
"""Prints a greeting for each person."""
for name in names:
print(f"Hello, {name}!")
from ordeq_files import CSV
from pathlib import Path
names = CSV(path=Path("names.csv"))
By specifying names
as the input, we inform Ordeq that the greet
node should use the data from names.csv
when the node is run.
Where to define IOs
Although you can define IOs anywhere in your project, it is best practice to define them in a separate module. Such a module is often referred to as a "catalog" and is discussed in more detail in the catalogs section.
Similarly, we can add a greetings
IO and specify it as output to the greet
node:
import catalog
@node(inputs=catalog.names, outputs=catalog.greetings)
def greet(names: tuple[str, ...]) -> list[str]:
"""Returns a greeting for each person."""
greetings = []
for name in names:
greetings.append(f"Hello, {name}!")
return greetings
from ordeq_files import CSV, Text
from pathlib import Path
names = CSV(path=Path("names.csv"))
greetings = Text(path=Path("greetings.txt"))
Nodes behave like plain functions
The @node
decorator only registers the function as a node, it does not change the function's behavior:
>>> type(greet)
function
>>> greet(["Alice", "Bob"])
["Hello, Alice!", "Hello, Bob!"]
>>> greet(greet(["Alice"]))
"Hello, Hello, Alice!!"
>>> greet.__doc__
"Prints a greeting for each person."
>>> greet.__annotations__
{'names': typing.Iterable[str], 'return': list[str]}
This also means the node can be unit tested like any other function.
Running a node
Nodes can be run as follows:
>>> from ordeq import run
>>> run(greet)
Let's break down what happens when a node is run:
- the node inputs (
names
) are loaded and passed to the functiongreet
- the function
greet
is executed - the values returned by
greet
are saved to the node outputs (greetings
).
Running greet
is therefore roughly equivalent to:
>>> names = catalog.names.load()
>>> greetings = greet(names)
>>> catalog.greetings.save(greetings)
Because Ordeq handles the loading and saving of the inputs and outputs, you can focus on the transformation in the node.
Running multiple nodes
Even the simplest data pipelines consist of multiple steps. Usually, one step depends on the output of another step. Let's extend our example with another node that parses the name to greet from a YAML file:
import catalog
@node(
inputs=catalog.invitees,
outputs=catalog.names,
)
def parse_names(invitees: dict) -> list[str]:
"""Parse the names from the invitees data."""
return [invitee["name"] for invitee in invitees]
@node(
inputs=catalog.names,
outputs=catalog.greetings
)
def greet(names: tuple[str, ...]) -> list[str]:
"""Returns a greeting for each person."""
greetings = []
for name in names:
greetings.append(f"Hello, {name}!")
return greetings
from ordeq_files import CSV, Text, YAML
from pathlib import Path
invitees = YAML(path=Path("invitees.yaml"))
names = CSV(path=Path("names.csv"))
greetings = Text(path=Path("greetings.txt"))
Note that parse_names
outputs the names
IO, which is input to the greet
node.
When we run the two nodes together, Ordeq will automatically pass the output of parse_names
to greet
:
>>> run(parse_names, greet)
This is roughly equivalent to:
>>> invitees = catalog.invitees.load()
>>> names = parse_names(invitees)
>>> catalog.names.save(names)
>>> greetings = greet(parsed_names)
>>> catalog.greetings.save(greetings)
As before, Ordeq handles the loading and saving of inputs and outputs. But now, it also takes care of passing the outputs of one node as inputs to another.
Dependency resolution
Ordeq resolves the DAG (Directed Acyclic Graph) of the nodes that are run, ensuring that each node is run in the correct order based on its dependencies. This also means an IO cannot be outputted by more than one node.
Retrieving results
The result of run
is a dictionary containing the data that was loaded or saved by each IO:
>>> result = run(parse_names, greet)
>>> result[catalog.names]
["Abraham", "Adam", "Azul", ...]
>>> result[catalog.greetings]
["Hello, Abraham!", "Hello, Adam!", "Hello, Azul!", ...]
This works much like a cache of the processed data for the duration of the run. Of course, you can also load the data directly from the IOs:
>>> greetings.load()
["Hello, Abraham!", "Hello, Adam!", "Hello, Azul!", ...]
This has the overhead of loading the data from storage, but it can be useful if you want to access the data after the run has completed.
Advanced: node tags
Nodes can be tagged to help organize and filter them.
Tags can be set using the tags
parameter in the @node
decorator:
import catalog
@node(
inputs=catalog.names,
outputs=catalog.greetings,
tags=["size:large"]
)
def greet(names: Iterable[str]) -> None:
"""Returns a greeting for each person."""
greetings = []
for name in names:
greetings.append(f"Hello, {name}!")
return greetings
The tags can be retrieved as follows:
>>> from ordeq.framework import get_node
>>> node = get_node(greet)
>>> node.tags
['size:large']
Tags are currently used by Ordeq extensions such as ordeq-cli-runner
, or ordeq-viz
.
Refer to the documentation of these extensions for more information.
Where to go from here?
- Have a look at the example project to see how nodes are used in practice
- See how to extend inject custom logic with node hooks
- Check out the guide on testing nodes