Checks¶
A check is a special type of node that is triggered once certain IOs are loaded or saved. Checks are triggered before other nodes are run. This can be useful for checking data, like enforcing constraints or running data quality tests. For example:
- Validating that data conforms to a specific schema (e.g., ensuring correct data types).
- Enforcing business rules on the data (e.g., ensuring that transaction amounts are positive).
- Profiling data to gather statistics or insights (e.g., calculating outliers).
Checks allow you to inject this logic in your pipelines with minimal code changes.
Checks are in preview
Checks are currently in preview and may change in future releases without prior notice.
Defining checks¶
Here is an example check that validates that a dataset is not empty:
import polars as pl
from ordeq import node
from ordeq_polars import PolarsEagerCSV
txs = PolarsEagerCSV(path="s3://my-bucket/txs.csv")
@node(inputs=txs, checks=txs)
def validate_row_count(df: pl.DataFrame) -> None:
if df.count() == 0:
raise ValueError("No transactions data found!")
Checks are created in the same way as regular nodes, but take a checks parameter.
This parameter specifies the IOs that trigger the execution of the check.
For instance:
@node(checks=[a])
def my_check(): ...
will run my_check after a is loaded or saved.
Analogously:
@node(checks=[a, b])
def my_check(): ...
will run my_check after both a and b are loaded or saved.
Checks with inputs¶
As any node, a check can take inputs. The inputs can be the same IOs that trigger the check, or different ones. This allows you to validate the data with additional IOs. For example, you can perform a check on one IO, using another IO as input:
from pathlib import Path
from ordeq_files import JSON
txs_schema = JSON(path=Path("schemas/txs.json"))
@node(inputs=[txs, txs_schema], checks=txs) # (1)!
def validate_schema(df: pl.DataFrame, schema: dict) -> None:
if not df.columns == schema["columns"]:
raise ValueError("Transactions data does not conform to schema!")
- The check is triggered by the
PolarsEagerCSV, but also takes aJSONas input.
This is useful if the check logic requires additional data to perform the validation. In the example above, this additional data is metadata (the schema), but it could also be other actual data:
from ordeq import node
from ordeq_polars import PolarsEagerCSV
txs_countries = PolarsEagerCSV(path="s3://my-bucket/valid-countries.csv")
@node(inputs=[txs, txs_countries], checks=txs) # (1)!
def validate_txs(df: pl.DataFrame, countries: pl.DataFrame) -> None:
if df.join(countries, on="country_code", how="anti").count() > 0:
raise ValueError("Transactions data contains invalid country codes!")
- The check is triggered by the
PolarsEagerCSV, but also takes anotherPolarsEagerCSVas input.
The check above validates that all country codes in the transactions data are valid by using an additional dataset of valid country codes.
Checks with outputs¶
Checks can also produce outputs. This is useful if you want to store the results of the check for later use, like analysis or reporting. For example, you can create a check that profiles the data and saves the results to a CSV file:
import polars as pl
from ordeq import node
from ordeq_polars import PolarsEagerCSV
txs = PolarsEagerCSV(path="s3://my-bucket/txs.csv")
txs_profile = PolarsEagerCSV(path="s3://my-bucket/txs_profile.csv")
@node(inputs=txs, outputs=txs_profile, checks=txs) # (1)!
def describe_txs(df: pl.DataFrame) -> pl.DataFrame:
return df.describe()
- The check is triggered by the
PolarsEagerCSV, and produces anotherPolarsEagerCSVas output.
It is also possible to reuse an output in other nodes:
import polars as pl
from ordeq import node
from ordeq_polars import PolarsEagerCSV
txs = PolarsEagerCSV(path="s3://my-bucket/txs.csv")
txs_invalid = PolarsEagerCSV(path="s3://my-bucket/txs-invalid-countries.csv")
@node(inputs=txs, outputs=txs_invalid, checks=txs) # (1)!
def filter_invalid(df: pl.DataFrame) -> pl.DataFrame:
valid_codes = ["US", "GB", "DE", "NL"]
return df.filter(~pl.col("country_code").is_in(valid_codes))
@node(inputs=txs_invalid, checks=txs) # (2)!
def check_invalid(invalid: pl.DataFrame) -> None:
if invalid.count() > 0:
raise ValueError(
f"Found {invalid.count()} transactions with invalid country codes!"
)
- The first check filters invalid country codes and produces an output with the invalid rows.
- The second check uses that output to raise an error if any invalid rows were found.
This is useful since the output of filter_invalid can be inspected later, even if check_invalid raises an error.
It also means you can build complex validations using checks that depend on other checks.
Running checks¶
Checks are automatically run when the IOs that trigger them are loaded or saved. For example, consider the following pipeline:
import catalog
import polars as pl
from ordeq import node
@node(inputs=catalog.txs, outputs=catalog.txs_aggregated)
def aggregate_txs(txs: pl.DataFrame) -> pl.DataFrame:
return txs.group_by("client_id").sum()
import catalog
import polars as pl
from ordeq import node
@node(inputs=catalog.txs, outputs=catalog.txs_invalid, checks=catalog.txs)
def filter_invalid(df: pl.DataFrame) -> pl.DataFrame:
valid_codes = ["US", "GB", "DE", "NL"]
return df.filter(~pl.col("country_code").is_in(valid_codes))
@node(inputs=catalog.txs_invalid, checks=catalog.txs)
def check_invalid(invalid: pl.DataFrame) -> None:
if invalid.count() > 0:
raise ValueError(
f"Found {invalid.count()} transactions with invalid country codes!"
)
from ordeq_polars import PolarsEagerCSV
txs = PolarsEagerCSV(path="s3://my-bucket/txs.csv")
txs_invalid = PolarsEagerCSV(path="s3://my-bucket/txs-invalid-countries.csv")
txs_aggregated = PolarsEagerCSV(path="s3://my-bucket/txs-aggregated.csv")
import pipeline
from ordeq import run
if __name__ == "__main__":
run(pipeline)
When the pipeline is run, the following happens:
txsis loaded (as usual)filter_invalidis run (since it is a check ontxs)check_invalidis run (since it is a check ontxs)- If
check_invalidpasses without errors, nodeaggregate_txsis run txs_aggregatedis saved (as usual)
That means checks can be added to existing pipelines without modifying existing code. You only need to define the checks in your project as shown above. You can run your pipeline as before, and Ordeq will take care of running the checks at the appropriate times.
Questions or feedback?
If you have any questions or feedback about checks or this guide, please open an issue on GitHub.