Skip to content

spark_job_group.py

SparkJobGroupHook

Bases: NodeHook

Node hook that sets the Spark job group to the node name. Please make sure the Spark session is initialized before using this hook.

Example usage:

>>> from ordeq.framework import node
>>> from ordeq_spark import SparkHiveTable
>>> from pyspark.sql import DataFrame
>>> from ordeq.framework.runner import run

>>> @node(
...     inputs=SparkHiveTable(table="tables.a"),
...     outputs=SparkHiveTable(table="tables.b"),
... )
... def append(a: DataFrame) -> DataFrame:
...     return a.union(a)

>>> run(append, hooks=[SparkJobGroupHook()]) # doctest: +SKIP

before_node_run(node)

Sets the node name as the job group in the Spark context. This makes the history server a lot easier to use.

Parameters:

Name Type Description Default
node Node

the node

required

Raises:

Type Description
RuntimeError

if the Spark session is not active