kedro.io.PartitionedDataSet

class kedro.io.PartitionedDataSet(path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False)[source]

PartitionedDataSet loads and saves partitioned file-like data using the underlying dataset definition. For filesystem level operations it uses fsspec: https://github.com/intake/filesystem_spec.

It also supports advanced features like lazy saving.

Example adding a catalog entry with YAML API:

cv_results_partitioned: # example to save results to multiple partitions
  type: PartitionedDataSet
  dataset:
    type: pandas.CSVDataSet
    save_args:
      index: False
  path: data/04_cv/
  filename_suffix: ".csv"

downloaded_data: # example with data available in multiple partitions
  type: PartitionedDataSet
  path: demo/01_raw/downloaded_station_data
  dataset:
    type: pandas.CSVDataSet
    load_args:
      sep: ','
      index_col: 0
  filename_suffix: '.csv'

Example using Python API:

 import pandas as pd
 from kedro.io import PartitionedDataSet

# Create a fake pandas dataframe with 10 rows of data
 df = pd.DataFrame([{"DAY_OF_MONTH": str(i), "VALUE": i} for i in range(1, 11)])

# Convert it to a dict of pd.DataFrame with DAY_OF_MONTH as the dict key
 dict_df = {day_of_month: df[df["DAY_OF_MONTH"] == day_of_month]
               for day_of_month in df['DAY_OF_MONTH']}

# Save it as small paritions with DAY_OF_MONTH as the partition key
 data_set = PartitionedDataSet(
 path="df_with_partition",
 dataset="pandas.CSVDataSet",
 filename_suffix=".csv"

)
# This will create a folder `df_with_partition` and save multiple files
# with the dict key + filename_suffix as filename, i.e. 1.csv, 2.csv etc.
 data_set.save(dict_df)

# This will create lazy load functions instead of loading data into memory immediately.
 loaded = data_set.load()

# Load all the partitions
 for partition_id, partition_load_func in loaded.items():
    # The actual function that loads the data
    partition_data = partition_load_func()

    # Add the processing logic for individual partition HERE
    print(partition_data)

In reality, you may load multiple partitions from a remote storage and combine them like this:

import pandas as pd
from kedro.io import PartitionedDataSet

# these credentials will be passed to both 'fsspec.filesystem()' call
# and the dataset initializer
credentials = {"key1": "secret1", "key2": "secret2"}

data_set = PartitionedDataSet(
    path="s3://bucket-name/path/to/folder",
    dataset="pandas.CSVDataSet",
    credentials=credentials
)
loaded = data_set.load()
# assert isinstance(loaded, dict)

combine_all = pd.DataFrame()

for partition_id, partition_load_func in loaded.items():
    partition_data = partition_load_func()
    combine_all = pd.concat(
        [combine_all, partition_data], ignore_index=True, sort=True
    )

new_data = pd.DataFrame({"new": [1, 2]})
# creates "s3://bucket-name/path/to/folder/new/partition.csv"
data_set.save({"new/partition.csv": new_data})

Methods

exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

from_config(name, config[, load_version, …])

Create a data set instance using the configuration provided.

load()

Loads data by delegation to the provided load method.

release()

Release any cached data.

save(data)

Saves data by delegation to the provided save method.

__init__(path, dataset, filepath_arg='filepath', filename_suffix='', credentials=None, load_args=None, fs_args=None, overwrite=False)[source]

Creates a new instance of PartitionedDataSet.

Parameters
  • path (str) – Path to the folder containing partitioned data. If path starts with the protocol (e.g., s3://) then the corresponding fsspec concrete filesystem implementation will be used. If protocol is not specified, fsspec.implementations.local.LocalFileSystem will be used. Note: Some concrete implementations are bundled with fsspec, while others (like s3 or gcs) must be installed separately prior to usage of the PartitionedDataSet.

  • dataset (Union[str, Type[AbstractDataSet], Dict[str, Any]]) – Underlying dataset definition. This is used to instantiate the dataset for each file located inside the path. Accepted formats are: a) object of a class that inherits from AbstractDataSet b) a string representing a fully qualified class name to such class c) a dictionary with type key pointing to a string from b), other keys are passed to the Dataset initializer. Credentials for the dataset can be explicitly specified in this configuration.

  • filepath_arg (str) – Underlying dataset initializer argument that will contain a path to each corresponding partition file. If unspecified, defaults to “filepath”.

  • filename_suffix (str) – If specified, only partitions that end with this string will be processed.

  • credentials (Optional[Dict[str, Any]]) – Protocol-specific options that will be passed to fsspec.filesystem https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.filesystem and the dataset initializer. If the dataset config contains explicit credentials spec, then such spec will take precedence. All possible credentials management scenarios are documented here: https://kedro.readthedocs.io/en/stable/data/kedro_io.html#partitioned-dataset-credentials

  • load_args (Optional[Dict[str, Any]]) – Keyword arguments to be passed into find() method of the filesystem implementation.

  • fs_args (Optional[Dict[str, Any]]) – Extra arguments to pass into underlying filesystem class constructor (e.g. {“project”: “my-project”} for GCSFileSystem)

  • overwrite (bool) – If True, any existing partitions will be removed.

Raises

DataSetError – If versioning is enabled for the underlying dataset.

exists()

Checks whether a data set’s output already exists by calling the provided _exists() method.

Return type

bool

Returns

Flag indicating whether the output already exists.

Raises

DataSetError – when underlying exists method raises error.

classmethod from_config(name, config, load_version=None, save_version=None)

Create a data set instance using the configuration provided.

Parameters
  • name (str) – Data set name.

  • config (Dict[str, Any]) – Data set config dictionary.

  • load_version (Optional[str]) – Version string to be used for load operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

  • save_version (Optional[str]) – Version string to be used for save operation if the data set is versioned. Has no effect on the data set if versioning was not enabled.

Return type

AbstractDataSet

Returns

An instance of an AbstractDataSet subclass.

Raises

DataSetError – When the function fails to create the data set from its config.

load()

Loads data by delegation to the provided load method.

Return type

~_DO

Returns

Data returned by the provided load method.

Raises

DataSetError – When underlying load method raises error.

release()

Release any cached data.

Raises

DataSetError – when underlying release method raises error.

Return type

None

save(data)

Saves data by delegation to the provided save method.

Parameters

data (~_DI) – the value to be saved by provided save method.

Raises
  • DataSetError – when underlying save method raises error.

  • FileNotFoundError – when save method got file instead of dir, on Windows.

  • NotADirectoryError – when save method got file instead of dir, on Unix.

Return type

None