kedro.extras.datasets.spark.SparkDataSet

class kedro.extras.datasets.spark.SparkDataSet(filepath, file_format='parquet', load_args=None, save_args=None, version=None, credentials=None)[source]

Bases: kedro.io.core.AbstractVersionedDataSet

SparkDataSet loads and saves Spark dataframes.

Example:

from pyspark.sql import SparkSession
from pyspark.sql.types import (StructField, StringType,
                               IntegerType, StructType)

from kedro.extras.datasets.spark import SparkDataSet

schema = StructType([StructField("name", StringType(), True),
                     StructField("age", IntegerType(), True)])

data = [('Alex', 31), ('Bob', 12), ('Clarke', 65), ('Dave', 29)]

spark_df = SparkSession.builder.getOrCreate()                                .createDataFrame(data, schema)

data_set = SparkDataSet(filepath="test_data")
data_set.save(spark_df)
reloaded = data_set.load()

reloaded.take(4)

Attributes

SparkDataSet.DEFAULT_LOAD_ARGS
SparkDataSet.DEFAULT_SAVE_ARGS

Methods

SparkDataSet.__init__(filepath[, …]) Creates a new instance of SparkDataSet.
SparkDataSet.exists() Checks whether a data set’s output already exists by calling the provided _exists() method.
SparkDataSet.from_config(name, config[, …]) Create a data set instance using the configuration provided.
SparkDataSet.load() Loads data by delegation to the provided load method.
SparkDataSet.release() Release any cached data.
SparkDataSet.resolve_load_version() Compute the version the dataset should be loaded with.
SparkDataSet.resolve_save_version() Compute the version the dataset should be saved with.
SparkDataSet.save(data) Saves data by delegation to the provided save method.
DEFAULT_LOAD_ARGS = {}
DEFAULT_SAVE_ARGS = {}
__init__(filepath, file_format='parquet', load_args=None, save_args=None, version=None, credentials=None)[source]

Creates a new instance of SparkDataSet.

Parameters:
  • filepath (str) – Path to a Spark dataframe. When using Databricks and working with data written to mount path points, specify filepath``s for (versioned) ``SparkDataSet``s starting with ``/dbfs/mnt.
  • file_format (str) – File format used during load and save operations. These are formats supported by the running SparkContext include parquet, csv. For a list of supported formats please refer to Apache Spark documentation at https://spark.apache.org/docs/latest/sql-programming-guide.html
  • load_args (Optional[Dict[str, Any]]) – Load args passed to Spark DataFrameReader load method. It is dependent on the selected file format. You can find a list of read options for each supported format in Spark DataFrame read documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
  • save_args (Optional[Dict[str, Any]]) – Save args passed to Spark DataFrame write options. Similar to load_args this is dependent on the selected file format. You can pass mode and partitionBy to specify your overwrite mode and partitioning respectively. You can find a list of options for each format in Spark DataFrame write documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
  • version (Optional[Version]) – If specified, should be an instance of kedro.io.core.Version. If its load attribute is None, the latest version will be loaded. If its save attribute is None, save version will be autogenerated.
  • credentials (Optional[Dict[str, Any]]) – Credentials to access the S3 bucket, such as aws_access_key_id, aws_secret_access_key, if filepath prefix is s3a:// or s3n://. Optional keyword arguments passed to hdfs.client.InsecureClient if filepath prefix is hdfs://. Ignored otherwise.
Return type:

None

exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

Return type:bool
Returns:Flag indicating whether the output already exists.
Raises:DataSetError – when underlying exists method raises error.
classmethod from_config(name, config, load_version=None, save_version=None)

Create a data set instance using the configuration provided.

Parameters:
  • name (str) – Data set name.
  • config (Dict[str, Any]) – Data set config dictionary.
  • load_version (Optional[str]) – Version string to be used for load operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
  • save_version (Optional[str]) – Version string to be used for save operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.
Return type:

AbstractDataSet

Returns:

An instance of an AbstractDataSet subclass.

Raises:

DataSetError – When the function fails to create the data set from its config.

load()

Loads data by delegation to the provided load method.

Return type:Any
Returns:Data returned by the provided load method.
Raises:DataSetError – When underlying load method raises error.
release()

Release any cached data.

Raises:DataSetError – when underlying release method raises error.
Return type:None
resolve_load_version()

Compute the version the dataset should be loaded with.

Return type:Optional[str]
resolve_save_version()

Compute the version the dataset should be saved with.

Return type:Optional[str]
save(data)

Saves data by delegation to the provided save method.

Parameters:data (Any) – the value to be saved by provided save method.
Raises:DataSetError – when underlying save method raises error.
Return type:None