Kubeflow integration¶
Ordeq allows you to define your data processing pipelines independently of the orchestration tool. This separation of concerns leads to more modular, reusable, and testable code, making it easier to maintain and evolve your data processing workflows over time. This guide demonstrates how to use Ordeq to define a machine learning pipeline and then orchestrate it using Kubeflow.
Example code
The complete example code for this guide is available in the Ordeq GitHub repository.
We will first recreate this example pipeline using Ordeq. Next, we will show how to orchestrate that pipeline using Kubeflow
The ML pipeline¶
We base our example on a simple machine learning pipeline that trains a K-Nearest Neighbors (KNN) classifier on the Iris dataset. The example is available in the Kubeflow Pipelines documentation. The pipeline consists of the following steps:
create_dataset: Downloads the Iris dataset from a remote URL and saves it as a local CSV file.normalize_dataset: Normalizes the dataset using either Min-Max scaling or Standard scaling.train_model: Trains a KNN classifier on the normalized dataset and saves the trained model as a pickle file.
Creating the Ordeq pipeline¶
Now that the code is written using Ordeq, we can focus on the domain logic without worrying about the orchestration details.
create_dataset: With Ordeq, the copying of the CSV from remote to local comes nearly for free. This step is reduced to setting up theordeq_pandas.PandasCSVIOs and defining the node.
@node(inputs=catalog.csv_remote, outputs=catalog.csv_local)
def create_dataset(df: pd.DataFrame) -> pd.DataFrame:
"""Copies the remote iris dataset to a local CSV file."""
return df
DATA_URL = (
"https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
)
columns = [
"Sepal_Length",
"Sepal_Width",
"Petal_Length",
"Petal_Width",
"Labels",
]
csv_remote = PandasCSV(path=DATA_URL).with_load_options(names=columns)
csv_local = PandasCSV(path=Path("data/raw/iris_dataset.csv"))
normalize_dataset: Now that Ordeq handles the IOs, we observe that thestandard_scalerandmin_max_scalerboolean flags could be simplified to a singlescalerparameter that takes a string value. This makes the node definition cleaner and easier to understand.
@node(
inputs=[catalog.csv_local, catalog.scaler], outputs=catalog.csv_normalized
)
def normalize_dataset(
df: pd.DataFrame, scaler: Literal["min_max", "standard"] | None = None
):
labels = df.pop("Labels")
match scaler:
case "standard":
scaler = StandardScaler()
case "min_max":
scaler = MinMaxScaler()
case _:
raise ValueError("Scaler must be either 'standard' or 'min_max'")
df = pd.DataFrame(scaler.fit_transform(df))
df["Labels"] = labels
return df
csv_normalized = PandasCSV(
path=Path("data/processed/normalized_iris_dataset.csv")
)
scaler = Input[str]("standard")
train_model: Similarly, the model training node is simplified by removing the IO handling code, allowing us to focus solely on the model training logic. For example, the random state that was hardcoded before is now passed as a parameter to the node, making sure that the code is reproducible when needed, but splits randomly by default. This is a key example where orchestration-specific code may interfere with the domain logic, in this case soundness of the ML model training.
import pandas as pd
from ordeq import node
from sklearn.base import BaseEstimator
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from ml_pipeline import catalog
@node(
inputs=[catalog.csv_normalized, catalog.n_neighbors, catalog.seed],
outputs=catalog.knn_model,
)
def train_model(
df: pd.DataFrame, n_neighbors: int, seed: int | None = None
) -> BaseEstimator:
y = df.pop("Labels")
X = df
X_train, _X_test, y_train, _y_test = train_test_split(
X, y, random_state=seed
)
clf = KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(X_train, y_train)
return clf
knn_model = Pickle(path=Path("data/models/knn_model.pkl"))
n_neighbors = Input[int](3)
seed = Input[int | None](0)
Running the pipeline locally¶
With the pipeline defined using Ordeq, we can now run it locally.
With Ordeq, this is as simple as calling the run function with the final node and the desired parameters.
import ml_pipeline
from ordeq import run
if __name__ == "__main__":
run(ml_pipeline)
Apart from the readability and maintainability improvements in the pipeline code, we also get a nice visualization of the pipeline structure when running it locally:
graph TB
subgraph legend["Legend"]
direction LR
node_type@{shape: rounded, label: "Node"}
io_type_0@{shape: rect, label: "Input"}
io_type_1@{shape: rect, label: "PandasCSV"}
io_type_2@{shape: rect, label: "Pickle"}
end
subgraph pipeline["Pipeline"]
direction TB
ml_pipeline.catalog:csv_remote --> ml_pipeline.preprocessing:create_dataset
ml_pipeline.preprocessing:create_dataset --> ml_pipeline.catalog:csv_local
ml_pipeline.catalog:csv_local --> ml_pipeline.preprocessing:normalize_dataset
ml_pipeline.catalog:scaler --> ml_pipeline.preprocessing:normalize_dataset
ml_pipeline.preprocessing:normalize_dataset --> ml_pipeline.catalog:csv_normalized
ml_pipeline.catalog:csv_normalized --> ml_pipeline.train:train_model
ml_pipeline.catalog:n_neighbors --> ml_pipeline.train:train_model
ml_pipeline.catalog:seed --> ml_pipeline.train:train_model
ml_pipeline.train:train_model --> ml_pipeline.catalog:knn_model
ml_pipeline.preprocessing:create_dataset@{shape: rounded, label: "create_dataset"}
ml_pipeline.preprocessing:normalize_dataset@{shape: rounded, label: "normalize_dataset"}
ml_pipeline.catalog:csv_local@{shape: rect, label: "csv_local"}
ml_pipeline.train:train_model@{shape: rounded, label: "train_model"}
ml_pipeline.catalog:csv_normalized@{shape: rect, label: "csv_normalized"}
ml_pipeline.catalog:csv_remote@{shape: rect, label: "csv_remote"}
ml_pipeline.catalog:knn_model@{shape: rect, label: "knn_model"}
ml_pipeline.catalog:n_neighbors@{shape: rect, label: "n_neighbors"}
ml_pipeline.catalog:scaler@{shape: rect, label: "scaler"}
ml_pipeline.catalog:seed@{shape: rect, label: "seed"}
end
class node_type,ml_pipeline.preprocessing:create_dataset,ml_pipeline.preprocessing:normalize_dataset,ml_pipeline.train:train_model node
class io_type_0,ml_pipeline.catalog:n_neighbors,ml_pipeline.catalog:scaler,ml_pipeline.catalog:seed io0
class io_type_1,ml_pipeline.catalog:csv_local,ml_pipeline.catalog:csv_normalized,ml_pipeline.catalog:csv_remote io1
class io_type_2,ml_pipeline.catalog:knn_model io2
classDef node fill:#008AD7,color:#FFF
classDef io fill:#FFD43B
classDef io0 fill:#66c2a5
classDef io1 fill:#fc8d62
classDef io2 fill:#8da0cb
style pipeline fill:transparent
Running the pipeline with Kubeflow¶
The final step is to orchestrate the pipeline using Kubeflow. Without loss of Kubeflow-specific functionality, such as "Output" or "Dataset" types, we can define the Kubeflow pipeline by wrapping the Ordeq nodes in Kubeflow components.
More roads lead to Rome
There are many ways to map Ordeq nodes to Kubeflow components. This is just one example of how it can be done. The key point is that the core logic of the nodes remains unchanged, and only the orchestration-specific code is added around it.
For example, we can define the Kubeflow components as follows:
from dataclasses import replace
from kfp import client, dsl
from ml_pipeline import catalog, preprocessing, train
from ordeq import run
@dsl.component(packages_to_install=["pandas==1.3.5"])
def create_dataset(iris_dataset: dsl.Output[dsl.Dataset]):
run(
preprocessing.create_dataset,
io={
catalog.csv_local: replace(
catalog.csv_local, path=iris_dataset.path
)
},
)
@dsl.component(packages_to_install=["pandas==1.3.5", "scikit-learn==1.0.2"])
def normalize_dataset(
input_iris_dataset: dsl.Input[dsl.Dataset],
normalized_iris_dataset: dsl.Output[dsl.Dataset],
scaler: str,
):
run(
preprocessing.normalize_dataset,
io={
catalog.csv_local: replace(
catalog.csv_local, path=input_iris_dataset.path
),
catalog.csv_normalized: replace(
catalog.csv_normalized, path=normalized_iris_dataset.path
),
catalog.scaler: scaler,
},
)
@dsl.component(packages_to_install=["pandas==1.3.5", "scikit-learn==1.0.2"])
def train_model(
normalized_iris_dataset: dsl.Input[dsl.Dataset],
model: dsl.Output[dsl.Model],
n_neighbors: int,
seed: int,
):
run(
train.train_model,
io={
catalog.csv_normalized: replace(
catalog.csv_normalized, path=normalized_iris_dataset.path
),
catalog.knn_model: replace(catalog.knn_model, path=model.path),
catalog.n_neighbors: n_neighbors,
catalog.seed: seed,
},
)
@dsl.pipeline(name="iris-training-pipeline")
def my_pipeline(scaler: str, neighbors: list[int], seed: int):
create_dataset_task = create_dataset()
normalize_dataset_task = normalize_dataset(
input_iris_dataset=create_dataset_task.outputs["iris_dataset"],
scaler=scaler,
)
with dsl.ParallelFor(neighbors) as n_neighbors:
train_model(
normalized_iris_dataset=normalize_dataset_task.outputs[
"normalized_iris_dataset"
],
n_neighbors=n_neighbors,
seed=seed,
)
if __name__ == "__main__":
endpoint = "<KFP_UI_URL>"
kfp_client = client.Client(host=endpoint)
pipeline_run = kfp_client.create_run_from_pipeline_func(
my_pipeline,
arguments={"scaler": "min_max", "neighbors": [3, 6, 9], "seed": 0},
)
url = f"{endpoint}/#/runs/details/{pipeline_run.run_id}"
print(url)
Since the engineer defining the Kubeflow pipeline does not need to worry about the core logic of the nodes, they can focus on the orchestration-specific aspects.
For example, the Python dependencies for each component might be automatically taken from the project's packaging configuration (e.g., pyproject.toml or requirements.txt), rather than being hardcoded in the Kubeflow component definitions.