Views¶
Views are a special type of node. They are ideal to reflect common transformations, such as:
- filtering, parsing, casting or selecting data
- converting inputs from one type to another (say, from Pandas to Spark DataFrame)
- selecting a device for model training (say, CPU vs GPU)
Often, we don't want to save the result of these transformations. Instead, we want to pass along the result directly to a subsequent transformation. That's exactly what views achieve.
Example¶
We will slightly adapt the running example from the [introduction]. Suppose we're tasked to:
- filter a Spark DataFrame
txsby date - join the filtered DataFrame with
clients, and output the result to a Hive table
A first attempt at completing this using Ordeq would look roughly like this:
import catalog
from ordeq import node
from pyspark.sql import DataFrame
@node(inputs=[catalog.txs, catalog.date], outputs=[catalog.txs_filtered])
def filter_by_date(txs: DataFrame, date: str) -> DataFrame:
return txs.filter(txs.date == date)
@node(
inputs=[catalog.txs, catalog.clients, catalog.date],
outputs=catalog.txs_and_clients,
)
def join_txs_and_clients(
txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
return txs.join(clients, on="client_id", how="left")
from ordeq import IO
from ordeq_args import CommandLineArg
from ordeq_spark import SparkHiveTable
date = CommandLineArg("--date", type=str)
txs = SparkHiveTable(table="txs")
txs_filtered = IO() # Placeholder IO
clients = SparkHiveTable(table="clients")
txs_and_clients = SparkHiveTable(table="txs_and_clients")
from nodes import filter_by_date, join_txs_and_clients
from ordeq import run
if __name__ == "__main__":
run(filter_by_date, join_txs_and_clients)
Here's what we have done:
- to keep the transformations modular, we created two nodes:
filter_by_dateandjoin_txs_and_clients. - we defined a placeholder IO
txs_filteredincatalog.py. This IO is not associated with any actual data source. It merely serves to pass the output from one node to the other.
This approach has a couple of downsides:
- the separate placeholder IO seems like overhead
- we have to remember to run both the
filter_by_dateandjoin_txs_with_clients, even thoughfilter_by_datewill never be run individually - it's not obvious how to reuse
filter_by_datewith different IO, while it does seem a very common transformation
Using views¶
Using views, we can edit the example above to as follows:
import catalog
from ordeq import node
from pyspark.sql import DataFrame
@node(inputs=[catalog.txs, catalog.date])
def txs_filtered(txs: DataFrame, date: str) -> DataFrame:
return txs.filter(txs.date == date)
@node(inputs=[txs_filtered, catalog.clients], outputs=catalog.txs_and_clients)
def join_txs_and_clients(
txs: DataFrame, clients: DataFrame, date: str
) -> DataFrame:
return txs.join(clients, on="client_id", how="left")
from ordeq import IO
from ordeq_args import CommandLineArg
from ordeq_spark import SparkHiveTable
date = CommandLineArg("--date", type=str)
txs = SparkHiveTable(table="txs")
clients = SparkHiveTable(table="clients")
txs_and_clients = SparkHiveTable(table="txs_and_clients")
from nodes import join_txs_and_clients
from ordeq import run
if __name__ == "__main__":
run(join_txs_and_clients)
Let's break this down:
- we have defined a view
txs_filtered. Views have no outputs, but may take inputs. In this case, the inputs are the same as those of the previousfilter_by_datenode. - we pass the view
txs_filtereddirectly as input to the nodejoin_txs_with_clients
By setting the view as input to the node, Ordeq will pass along the result of txs_filtered to join_txs_with_clients, without the need for a placeholder IO.
In addition, it will automatically run txs_filtered once join_txs_with_clients is run.
This greatly simplifies the run command.