Working with PySpark

Note: This documentation is based on Kedro 0.16.2, if you spot anything that is incorrect then please create an issue or pull request.

This page outlines some best practices when building a Kedro pipeline with PySpark. It assumes a basic understanding of both Kedro and PySpark.

Centralise Spark configuration in conf/base/spark.yml

Spark allows you to specify many different configuration options. We recommend storing all of these options in a file located at conf/base/spark.yml. Below is an example of the content of the file to specify the maxResultSize of the Spark’s driver and to use the FAIR scheduler:

spark.driver.maxResultSize: 3g
spark.scheduler.mode: FAIR
Note: Optimal configuration for Spark depends on the setup of your Spark cluster.

Initialise a SparkSession in ProjectContext

Before any PySpark operations are performed, you should initialise your SparkSession in your ProjectContext, which is the entrypoint for your Kedro project. This ensures that a SparkSession has been initialised before the Kedro pipeline is run.

Below is an example implementation to initialise the SparkSession in <project-name>/src/<package-name>/run.py by reading configuration from the spark.yml configuration file created in the previous section:

from typing import Any, Dict, Union

from pyspark import SparkConf
from pyspark.sql import SparkSession


class ProjectContext(KedroContext):

    def __init__(
        self,
        project_path: Union[Path, str],
        env: str = None,
        extra_params: Dict[str, Any] = None,
    ):
        super().__init__(project_path, env, extra_params)
        self.init_spark_session()

    def init_spark_session(self) -> None:
        """Initialises a SparkSession using the config defined in project's conf folder."""

        # Load the spark configuration in spark.yaml using the config loader
        parameters = self.config_loader.get("spark*", "spark*/**")
        spark_conf = SparkConf().setAll(parameters.items())

        # Initialise the spark session
        spark_session_conf = (
            SparkSession.builder
            .appName(self.project_name)
            .enableHiveSupport()
            .config(conf=spark_conf)
        )
        _spark_session = spark_session_conf.getOrCreate()
        _spark_session.sparkContext.setLogLevel("WARN")

    def _get_pipelines(self) -> Dict[str, Pipeline]:
        return create_pipelines()

    project_name = "kedro"
    project_version = "0.16.2"

You should modify this code to adapt it to your cluster’s setup, e.g. setting master to yarn if you are running Spark on YARN.

Call SparkSession.builder.getOrCreate() to obtain the SparkSession anywhere in your pipeline. SparkSession.builder.getOrCreate() is a global singleton.

We don’t recommend storing the session on the context object, as it cannot be serialised and therefore prevents the context from being initialised for some plugins.

Use Kedro’s built-in Spark datasets to load and save raw data

We recommend using Kedro’s built-in Spark datasets to load raw data into Spark’s DataFrame, as well as to write them back to storage. Some of our built-in Spark datasets include:

The example below illustrates how to use spark.SparkDataSet to read a CSV file located in S3 into a DataFrame in <projec-namet>/conf/base/catalog.yml:

weather:
  type: spark.SparkDataSet
  filepath: s3a://your_bucket/data/01_raw/weather*
  file_format: csv
  load_args:
    header: True
    inferSchema: True
  save_args:
    sep: '|'
    header: True

Or using the Python API:

import pyspark.sql
from kedro.io import DataCatalog
from kedro.extras.datasets.spark import SparkDataSet

spark_ds = SparkDataSet(
    filepath="s3a://your_bucket/data/01_raw/weather*",
    file_format="csv",
    load_args={"header": True, "inferSchema": True},
    save_args={"sep": "|", "header": True},
)
catalog = DataCatalog({"weather": spark_ds})

df = catalog.load("weather")
assert isinstance(df, pyspark.sql.DataFrame)

Use MemoryDataSet for intermediary DataFrame

For nodes operating on DataFrame that doesn’t need to perform Spark actions such as writing the DataFrame to storage, we recommend using the default MemoryDataSet to hold the DataFrame. In other words, there is no need to specify it in the DataCatalog or catalog.yml. This allows you to take advantage of Spark’s optimiser and lazy evaluation.

Use MemoryDataSet with copy_mode="assign" for non-DataFrame Spark objects

Sometimes, you might want to use Spark objects that aren’t DataFrame as inputs and outputs in your pipeline. For example, suppose you have a train_model node to train a classifier using Spark ML’s RandomForrestClassifier and a predict node to make predictions using this classifier. In this scenario, the train_model node will output a RandomForestClassifier object, which then becomes the input for the predict node. Below is the code for this pipeline:

from typing import Any, Dict

from kedro.pipeline import Pipeline, node
from pyspark.ml.classification import RandomForestClassifier
from pyspark.sql import DataFrame


def train_model(training_data: DataFrame) -> RandomForestClassifier:
    """Node for training a random forest model to classify the data.
    """
    classifier = RandomForestClassifier(numTrees=10)
    return classifier.fit(training_data)


def predict(model: RandomForestClassifier, testing_data: DataFrame) -> DataFrame:
    """Node for making predictions given a pre-trained model and a testing dataset.
    """
    predictions = model.transform(testing_data)
    return predictions


def create_pipeline(**kwargs):
    return Pipeline(
        [
            node(
                train_model,
                inputs=["training_data"],
                outputs="example_classifier",
            ),
            node(
                predict,
                inputs=dict(model="example_classifier", testing_data="testing_data"),
                outputs="example_predictions",
            )
        ]
    )

To make the pipeline work, you will need to specify example_classifier as follows in the catalog.yml:

example_classifier:
  type: MemoryDataSet
  copy_mode: assign

The assign copy mode ensures that the MemoryDataSet will be assigned the Spark object itself, not a deep copy version of it, since deep copy doesn’t work with Spark object generally.

Tips for maximising concurrency using ThreadRunner

Under the hood, every Kedro node that performs a Spark action (e.g. save, collect) is submitted to the Spark cluster as a Spark job through the same SparkSession instance. These jobs may be running concurrently if they were submitted by different threads. In order to do that, you will need to run your Kedro pipeline with the ThreadRunner:

kedro run --runner=ThreadRunner

To further increase the concurrency level, if you are using Spark >= 0.8, you can also give each node a roughly equal share of the Spark cluster by turning on fair sharing and therefore giving them a roughly equal chance of being executed concurrently. By default, they are executed in a FIFO manner, which means if a job takes up too much resources, it could hold up the execution of other jobs. In order to turn on fair sharing, put the following in your conf/base/spark.yml file, which was created in the Initialising a SparkSession section:

spark.scheduler.mode: FAIR

For more information, please visit Spark documentation on jobs scheduling within an application.