ordeq_spark
SparkCSV
dataclass
¶
Bases: IO[DataFrame]
IO for loading and saving CSV using Spark.
Example:
>>> from ordeq_spark import SparkCSV
>>> csv = SparkCSV(
... path="to.csv"
... ).with_load_options(
... infer_schema=True
... )
By default, Spark creates a directory on save.
Use single_file if you want to write to a file instead:
>>> from ordeq_spark import SparkCSV
>>> csv = SparkCSV(
... path="to.csv"
... ).with_save_options(single_file=True)
SparkDataFrame
dataclass
¶
Bases: Input[DataFrame]
Allows a Spark DataFrame to be hard-coded in python. This is suitable for small tables such as very simple dimension tables that are unlikely to change. It may also be useful in unit testing.
Example usage:
>>> from pyspark.sql.types import *
>>> from ordeq_spark import SparkDataFrame
>>> df = SparkDataFrame(
... schema=StructType([
... StructField("year", IntegerType()),
... StructField("datafile", StringType()),
... ]),
... data=(
... (2022, "file_2022.xlsx"),
... (2023, "file_2023.xlsx"),
... (2024, "file_2023.xlsx"),
... )
... )
SparkExplainHook
¶
SparkGlobalTempView
dataclass
¶
Bases: IO[DataFrame]
IO for reading from and writing to Spark global temporary views.
Examples:
Create and save a DataFrame to a global temp view:
>>> from ordeq_spark import SparkGlobalTempView
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate() # doctest: +SKIP
>>> view = SparkGlobalTempView(table="my_temp_view")
>>> df = spark.createDataFrame(
... [(1, "Alice"), (2, "Bob")], ["id", "name"]
... ) # doctest: +SKIP
>>> view.save(df, mode="createOrReplace") # doctest: +SKIP
Load the DataFrame from the global temp view:
>>> loaded_df = view.load() # doctest: +SKIP
>>> loaded_df.show() # doctest: +SKIP
SparkHiveTable
dataclass
¶
Bases: SparkTable, IO[DataFrame]
IO for reading from and writing to Hive tables in Spark.
Examples:
Save a DataFrame to a Hive table:
>>> from ordeq_spark import SparkHiveTable
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.enableHiveSupport().getOrCreate() # doctest: +SKIP
>>> table = SparkHiveTable(table="my_hive_table")
>>> df = spark.createDataFrame(
... [(1, "Alice"), (2, "Bob")], ["id", "name"]
... ) # doctest: +SKIP
>>> table.save(df, format="parquet", mode="overwrite") # doctest: +SKIP
SparkIcebergTable
dataclass
¶
Bases: SparkTable, IO[DataFrame]
IO used to load & save Iceberg tables using Spark.
Example usage:
>>> from ordeq_spark import (
... SparkIcebergTable
... )
>>> from pyspark.sql.types import StructType, StructField, IntegerType
>>> my_table = SparkIcebergTable(
... table="my.iceberg.table",
... schema=StructType(
... fields=[
... StructField("id", IntegerType()),
... StructField("amount", IntegerType()),
... ]
... )
... )
>>> import pyspark.sql.functions as F
>>> my_partitioned_table = (
... SparkIcebergTable(
... table="my.iceberg.table"
... ).with_save_options(
... mode="overwritePartitions",
... partition_by=(
... ("colour",),
... (F.years, "dt"),
... )
... )
... )
Saving is idempotent: if the target table does not exist, it is created with the configuration set in the save options.
Table properties can be specified on the properties attribute.
Currently, the properties will be taken into account on write only.
>>> table_with_properties = SparkIcebergTable(
... table="my.iceberg.table",
... properties=(
... ('read.split.target-size', '268435456'),
... ('write.parquet.row-group-size-bytes', '268435456'),
... )
... )
Currently only supports a subset of Iceberg writes. More info 1:
save(df, mode='overwritePartitions', partition_by=())
¶
Saves the DataFrame to the Iceberg table.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame to save |
required |
mode
|
SparkIcebergWriteMode
|
write mode, one of: - "create" - create the table, fail if it exists - "createOrReplace" - create the table, replace if it exists - "overwrite" - overwrite the table - "overwritePartitions" - overwrite partitions of the table - "append" - append to the table |
'overwritePartitions'
|
partition_by
|
SparkIcebergPartitionType
|
columns to partition by, can be - a single column name as a string, e.g. "colour" - a list of column names, e.g. ["colour", "dt"] - a tuple of tuples for more complex partitioning, e.g. (("colour",), (F.years, "dt")) |
()
|
Raises:
| Type | Description |
|---|---|
ValueError
|
if mode is not one of the supported modes |
RuntimeError
|
if the Spark captured exception cannot be parsed |
CapturedException
|
if there is an error during the write operation |
SparkJSON
dataclass
¶
Bases: IO[DataFrame]
IO for loading and saving JSON using Spark.
Example:
>>> from ordeq_spark import SparkJSON
>>> json = SparkJSON(
... path="to.json"
... )
By default, Spark creates a directory on save.
Use single_file if you want to write to a file instead:
>>> from ordeq_spark import SparkJSON
>>> json = SparkJSON(
... path="to.json"
... ).with_save_options(single_file=True)
SparkJobGroupHook
¶
Bases: NodeHook
Node hook that sets the Spark job group to the node name. Please make sure the Spark session is initialized before using this hook.
Example usage:
>>> from ordeq import node, run
>>> from ordeq_spark import SparkHiveTable
>>> from pyspark.sql import DataFrame
>>> @node(
... inputs=SparkHiveTable(table="tables.a"),
... outputs=SparkHiveTable(table="tables.b"),
... )
... def append(a: DataFrame) -> DataFrame:
... return a.union(a)
>>> run(append, hooks=[SparkJobGroupHook()]) # doctest: +SKIP
SparkParquet
dataclass
¶
Bases: IO[DataFrame]
IO for loading and saving Parquet files using Spark.
Basic usage:
>>> from ordeq_spark import SparkParquet
>>> parquet = SparkParquet(path="data.parquet")
>>> df = parquet.load() # doctest: +SKIP
>>> parquet.save(df) # doctest: +SKIP
Loading with options:
>>> df = parquet.load(
... modifiedBefore="2050-07-01T08:30:00"
... ) # doctest: +SKIP
Saving with options:
>>> parquet.save(
... df,
... mode="overwrite",
... compression="brotli"
... ) # doctest: +SKIP
load(**load_options)
¶
Loads a Parquet file into a Spark DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
load_options
|
Any
|
Additional options to pass to Spark's read.parquet method. |
{}
|
Returns:
| Type | Description |
|---|---|
DataFrame
|
A Spark DataFrame containing the data from the Parquet file. |
save(df, **save_options)
¶
Saves a Spark DataFrame to a Parquet file.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
The Spark DataFrame to save. |
required |
save_options
|
Any
|
Additional options to pass to Spark's write.parquet method. |
{}
|
SparkSession
dataclass
¶
Bases: Input[SparkSession]
Input representing the active Spark session. Useful for accessing the active Spark session in nodes.
Example:
>>> from ordeq_spark.io.session import SparkSession
>>> spark_session = SparkSession()
>>> spark = spark_session.load() # doctest: +SKIP
>>> print(spark.version) # doctest: +SKIP
3.3.1
Example in a node:
>>> from ordeq import node, Input
>>> items = Input[dict]({'id': [1, 2, 3], 'value': ['a', 'b', 'c']})
>>> @node(
... inputs=[items, spark_session],
... outputs=[],
... )
... def convert_to_df(
... data: dict, spark: pyspark.sql.SparkSession
... ) -> pyspark.sql.DataFrame:
... return spark.createDataFrame(data)
load()
¶
Gets the active SparkSession. Errors if there is no active Spark session.
Returns:
| Type | Description |
|---|---|
SparkSession
|
pyspark.sql.SparkSession: The Spark session. |
SparkTempView
dataclass
¶
Bases: IO[DataFrame]
IO for reading from and writing to Spark temporary views.
Examples:
Create and save a DataFrame to a temp view:
>>> from ordeq_spark import SparkTempView
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.getOrCreate() # doctest: +SKIP
>>> view = SparkTempView(table="my_temp_view")
>>> df = spark.createDataFrame(
... [(1, "Alice"), (2, "Bob")], ["id", "name"]
... ) # doctest: +SKIP
>>> view.save(df, mode="createOrReplace") # doctest: +SKIP
Load the DataFrame from the temp view:
>>> loaded_df = view.load() # doctest: +SKIP
>>> loaded_df.show() # doctest: +SKIP