Skip to content

iceberg.py

SparkIcebergTable dataclass

Bases: SparkTable, IO[DataFrame]

IO used to load & save Iceberg tables using Spark.

Example usage:

>>> from ordeq_spark import (
...     SparkIcebergTable
... )
>>> from pyspark.sql.types import StructType, StructField, IntegerType
>>> MyTable = SparkIcebergTable(
...     table="my.iceberg.table",
...     schema=StructType(
...         fields=[
...             StructField("id", IntegerType()),
...             StructField("amount", IntegerType()),
...         ]
...     )
... )

>>> import pyspark.sql.functions as F
>>> MyPartitionedTable = (
...     SparkIcebergTable(
...         table="my.iceberg.table"
...     ).with_save_options(
...         mode="overwritePartitions",
...         partition_by=(
...             ("colour",),
...             (F.years, "dt"),
...         )
...     )
... )

If schema is provided, it will be applied before save and after load.

Saving is idempotent: if the target table does not exist, it is created with the configuration set in the save options.

Table properties can be specified on the properties attribute. Currently, the properties will be taken into account on write only.

>>> TableWithProperties = SparkIcebergTable(
...     table="my.iceberg.table",
...     properties=(
...         ('read.split.target-size', '268435456'),
...         ('write.parquet.row-group-size-bytes', '268435456'),
...     )
... )

Currently only supports a subset of Iceberg writes. More info 1:

save(df, mode='overwritePartitions', partition_by=())

Saves the DataFrame to the Iceberg table.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to save

required
mode SparkIcebergWriteMode

write mode, one of: - "create" - create the table, fail if it exists - "createOrReplace" - create the table, replace if it exists - "overwrite" - overwrite the table - "overwritePartitions" - overwrite partitions of the table - "append" - append to the table

'overwritePartitions'
partition_by tuple[tuple[Callable[[str], Column], str] | tuple[str], ...]

tuple of columns to partition by, each element can be - a tuple of a function and a column name, e.g. (F.years, "dt") - a single column name as a string, e.g. "colour"

()

Raises:

Type Description
TypeError

if partition_by is not a tuple of tuples

ValueError

if mode is not one of the supported modes

RuntimeError

if the Spark captured exception cannot be parsed

CapturedException

if there is an error during the write operation