iceberg.py
SparkIcebergTable
dataclass
Bases: SparkTable
, IO[DataFrame]
IO used to load & save Iceberg tables using Spark.
Example usage:
>>> from ordeq_spark import (
... SparkIcebergTable
... )
>>> from pyspark.sql.types import StructType, StructField, IntegerType
>>> MyTable = SparkIcebergTable(
... table="my.iceberg.table",
... schema=StructType(
... fields=[
... StructField("id", IntegerType()),
... StructField("amount", IntegerType()),
... ]
... )
... )
>>> import pyspark.sql.functions as F
>>> MyPartitionedTable = (
... SparkIcebergTable(
... table="my.iceberg.table"
... ).with_save_options(
... mode="overwritePartitions",
... partition_by=(
... ("colour",),
... (F.years, "dt"),
... )
... )
... )
If schema
is provided, it will be applied before save and after load.
Saving is idempotent: if the target table does not exist, it is created with the configuration set in the save options.
Table properties can be specified on the properties
attribute.
Currently, the properties will be taken into account on write only.
>>> TableWithProperties = SparkIcebergTable(
... table="my.iceberg.table",
... properties=(
... ('read.split.target-size', '268435456'),
... ('write.parquet.row-group-size-bytes', '268435456'),
... )
... )
Currently only supports a subset of Iceberg writes. More info 1:
save(df, mode='overwritePartitions', partition_by=())
Saves the DataFrame to the Iceberg table.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df
|
DataFrame
|
DataFrame to save |
required |
mode
|
SparkIcebergWriteMode
|
write mode, one of: - "create" - create the table, fail if it exists - "createOrReplace" - create the table, replace if it exists - "overwrite" - overwrite the table - "overwritePartitions" - overwrite partitions of the table - "append" - append to the table |
'overwritePartitions'
|
partition_by
|
tuple[tuple[Callable[[str], Column], str] | tuple[str], ...]
|
tuple of columns to partition by, each element can be - a tuple of a function and a column name, e.g. (F.years, "dt") - a single column name as a string, e.g. "colour" |
()
|
Raises:
Type | Description |
---|---|
TypeError
|
if partition_by is not a tuple of tuples |
ValueError
|
if mode is not one of the supported modes |
RuntimeError
|
if the Spark captured exception cannot be parsed |
CapturedException
|
if there is an error during the write operation |