Skip to content

ordeq_spark

SparkCSV dataclass

Bases: IO[DataFrame]

IO for loading and saving CSV using Spark.

Example:

>>> from ordeq_spark import SparkCSV
>>> csv = SparkCSV(
...     path="to.csv"
... ).with_load_options(
...     infer_schema=True
... )

By default, Spark creates a directory on save. Use single_file if you want to write to a file instead:

>>> from ordeq_spark import SparkCSV
>>> csv = SparkCSV(
...     path="to.csv"
... ).with_save_options(single_file=True)

SparkDataFrame dataclass

Bases: Input[DataFrame]

Allows a Spark DataFrame to be hard-coded in python. This is suitable for small tables such as very simple dimension tables that are unlikely to change. It may also be useful in unit testing.

Example usage:

>>> from pyspark.sql.types import *
>>> from ordeq_spark import SparkDataFrame
>>> df = SparkDataFrame(
...     schema=StructType([
...         StructField("year", IntegerType()),
...         StructField("datafile", StringType()),
...     ]),
...     data=(
...         (2022, "file_2022.xlsx"),
...         (2023, "file_2023.xlsx"),
...         (2024, "file_2023.xlsx"),
...     )
... )

SparkExplainHook

Bases: OutputHook[DataFrame]

Hook to print the Spark execution plan before saving a DataFrame.

SparkGlobalTempView dataclass

Bases: IO[DataFrame]

IO for reading from and writing to Spark global temporary views.

Examples:

Create and save a DataFrame to a global temp view:

>>> from ordeq_spark import SparkGlobalTempView
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()  # doctest: +SKIP
>>> view = SparkGlobalTempView(table="my_temp_view")
>>> df = spark.createDataFrame(
...     [(1, "Alice"), (2, "Bob")], ["id", "name"]
... )  # doctest: +SKIP
>>> view.save(df, mode="createOrReplace")  # doctest: +SKIP

Load the DataFrame from the global temp view:

>>> loaded_df = view.load()  # doctest: +SKIP
>>> loaded_df.show()  # doctest: +SKIP

SparkHiveTable dataclass

Bases: SparkTable, IO[DataFrame]

IO for reading from and writing to Hive tables in Spark.

Examples:

Save a DataFrame to a Hive table:

>>> from ordeq_spark import SparkHiveTable
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.enableHiveSupport().getOrCreate()  # doctest: +SKIP
>>> table = SparkHiveTable(table="my_hive_table")
>>> df = spark.createDataFrame(
...     [(1, "Alice"), (2, "Bob")], ["id", "name"]
... )  # doctest: +SKIP
>>> table.save(df, format="parquet", mode="overwrite")  # doctest: +SKIP

SparkIcebergTable dataclass

Bases: SparkTable, IO[DataFrame]

IO used to load & save Iceberg tables using Spark.

Example usage:

>>> from ordeq_spark import (
...     SparkIcebergTable
... )
>>> from pyspark.sql.types import StructType, StructField, IntegerType
>>> my_table = SparkIcebergTable(
...     table="my.iceberg.table",
...     schema=StructType(
...         fields=[
...             StructField("id", IntegerType()),
...             StructField("amount", IntegerType()),
...         ]
...     )
... )

>>> import pyspark.sql.functions as F
>>> my_partitioned_table = (
...     SparkIcebergTable(
...         table="my.iceberg.table"
...     ).with_save_options(
...         mode="overwritePartitions",
...         partition_by=(
...             ("colour",),
...             (F.years, "dt"),
...         )
...     )
... )

Saving is idempotent: if the target table does not exist, it is created with the configuration set in the save options.

Table properties can be specified on the properties attribute. Currently, the properties will be taken into account on write only.

>>> table_with_properties = SparkIcebergTable(
...     table="my.iceberg.table",
...     properties=(
...         ('read.split.target-size', '268435456'),
...         ('write.parquet.row-group-size-bytes', '268435456'),
...     )
... )

Currently only supports a subset of Iceberg writes. More info 1:

save(df, mode='overwritePartitions', partition_by=())

Saves the DataFrame to the Iceberg table.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to save

required
mode SparkIcebergWriteMode

write mode, one of: - "create" - create the table, fail if it exists - "createOrReplace" - create the table, replace if it exists - "overwrite" - overwrite the table - "overwritePartitions" - overwrite partitions of the table - "append" - append to the table

'overwritePartitions'
partition_by SparkIcebergPartitionType

columns to partition by, can be - a single column name as a string, e.g. "colour" - a list of column names, e.g. ["colour", "dt"] - a tuple of tuples for more complex partitioning, e.g. (("colour",), (F.years, "dt"))

()

Raises:

Type Description
ValueError

if mode is not one of the supported modes

RuntimeError

if the Spark captured exception cannot be parsed

CapturedException

if there is an error during the write operation

SparkJSON dataclass

Bases: IO[DataFrame]

IO for loading and saving JSON using Spark.

Example:

>>> from ordeq_spark import SparkJSON
>>> json = SparkJSON(
...     path="to.json"
... )

By default, Spark creates a directory on save. Use single_file if you want to write to a file instead:

>>> from ordeq_spark import SparkJSON
>>> json = SparkJSON(
...     path="to.json"
... ).with_save_options(single_file=True)

SparkJobGroupHook

Bases: NodeHook

Node hook that sets the Spark job group to the node name. Please make sure the Spark session is initialized before using this hook.

Example usage:

>>> from ordeq import node, run
>>> from ordeq_spark import SparkHiveTable
>>> from pyspark.sql import DataFrame

>>> @node(
...     inputs=SparkHiveTable(table="tables.a"),
...     outputs=SparkHiveTable(table="tables.b"),
... )
... def append(a: DataFrame) -> DataFrame:
...     return a.union(a)

>>> run(append, hooks=[SparkJobGroupHook()]) # doctest: +SKIP

before_node_run(node)

Sets the node name as the job group in the Spark context. This makes the history server a lot easier to use.

Parameters:

Name Type Description Default
node Node

the node

required

Raises:

Type Description
RuntimeError

if the Spark session is not active

SparkParquet dataclass

Bases: IO[DataFrame]

IO for loading and saving Parquet files using Spark.

Basic usage:

>>> from ordeq_spark import SparkParquet
>>> parquet = SparkParquet(path="data.parquet")
>>> df = parquet.load()  # doctest: +SKIP
>>> parquet.save(df)  # doctest: +SKIP

Loading with options:

>>> df = parquet.load(
...     modifiedBefore="2050-07-01T08:30:00"
... )  # doctest: +SKIP

Saving with options:

>>> parquet.save(
...     df,
...     mode="overwrite",
...     compression="brotli"
... ) # doctest: +SKIP

load(**load_options)

Loads a Parquet file into a Spark DataFrame.

Parameters:

Name Type Description Default
load_options Any

Additional options to pass to Spark's read.parquet method.

{}

Returns:

Type Description
DataFrame

A Spark DataFrame containing the data from the Parquet file.

save(df, **save_options)

Saves a Spark DataFrame to a Parquet file.

Parameters:

Name Type Description Default
df DataFrame

The Spark DataFrame to save.

required
save_options Any

Additional options to pass to Spark's write.parquet method.

{}

SparkSession dataclass

Bases: Input[SparkSession]

Input representing the active Spark session. Useful for accessing the active Spark session in nodes.

Example:

>>> from ordeq_spark.io.session import SparkSession
>>> spark_session = SparkSession()
>>> spark = spark_session.load()  # doctest: +SKIP
>>> print(spark.version)  # doctest: +SKIP
3.3.1

Example in a node:

>>> from ordeq import node, Input
>>> items = Input[dict]({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})
>>> @node(
...     inputs=[items, spark_session],
...     outputs=[],
... )
... def convert_to_df(
...     data: dict, spark: pyspark.sql.SparkSession
... ) -> pyspark.sql.DataFrame:
...     return spark.createDataFrame(data)

load()

Gets the active SparkSession. Errors if there is no active Spark session.

Returns:

Type Description
SparkSession

pyspark.sql.SparkSession: The Spark session.

SparkTempView dataclass

Bases: IO[DataFrame]

IO for reading from and writing to Spark temporary views.

Examples:

Create and save a DataFrame to a temp view:

>>> from ordeq_spark import SparkTempView
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate()  # doctest: +SKIP
>>> view = SparkTempView(table="my_temp_view")
>>> df = spark.createDataFrame(
...     [(1, "Alice"), (2, "Bob")], ["id", "name"]
... )  # doctest: +SKIP
>>> view.save(df, mode="createOrReplace")  # doctest: +SKIP

Load the DataFrame from the temp view:

>>> loaded_df = view.load()  # doctest: +SKIP
>>> loaded_df.show()  # doctest: +SKIP