Creating a new dataset

Even though Kedro supports many datasets out of the box to help you with your IO need, sometimes they are not enough. For example, you might be dealing with a particular proprietary data format or filesystem in your pipeline, or perhaps you have found a popular use-case for dataset that isn’t supported by Kedro yet and want to contribute. This tutorial will help you understand what’s involved in the process of creating a new dataset and contributing it to Kedro.

Scenario

Suppose we are training a model to classify the type of a Pokémon, e.g. Water, Fire, Bug, etc., based on its appearance. We will be using this Kaggle dataset, which contains ~2MB of Pokémon images as well as a CSV containing their corresponding types.

Project setup

Let’s bootstrap our project with kedro new and name our project kedro-pokemon. Then download and save the dataset in data/01_raw. The data directory structure should be:

data
├── 01_raw
│   └── pokemon-images-and-types
├── 02_intermediate
├── 03_primary
├── 04_features
├── 05_model_input
├── 06_models
├── 07_model_output
└── 08_reporting

Problem

In order to train our model, we will need a mechanism to read the Pokémon images from png files into numpy arrays for further manipulation in our Kedro pipeline. As Kedro doesn’t provide a dataset to work with PNG images out of the box, this is a good opportunity to create an ImageDataSet to facilitate the reading and saving of image data in our project. The dataset will use Pillow under the hood for generic image processing functionality so that it will work with many different image formats, not just PNG.

To install Pillow:

$ pip install Pillow

If you run into any installation problem, please head over to their documentation for more details.

The anatomy of a dataset

At the minimum, a valid Kedro dataset needs to subclass the base AbstractDataSet and provide an implementation for the _load, _save and _describe abstract methods. For example, a skeleton for our ImageDataSet might look like:

from typing import Any, Dict, List

import numpy as np

from kedro.io import AbstractDataSet


class ImageDataSet(AbstractDataSet):
    """``ImageDataSet`` loads / save image data from a given filepath as `numpy` array using Pillow.

    Example:
    ::

        >>> ImageDataSet(filepath='/img/file/path.png')
    """

    def __init__(self, filepath: str):
        """Creates a new instance of ImageDataSet to load / save image data at the given filepath.

        Args:
            filepath: The location of the image file to load / save data.
        """
        self._filepath = filepath

    def _load(self) -> np.ndarray:
        """Loads data from the image file.

        Returns:
            Data from the image file as a numpy array.
        """
        ...

    def _save(self, data: np.ndarray) -> None:
        """Saves image data to the specified filepath"""
        ...

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset"""
        ...

We can place this dataset definition in src/kedro_pokemon/io/datasets/image_dataset.py to mimic the structure inside the Kedro’s codebase:

src/kedro_pokemon/io
├── __init__.py
└── datasets
    ├── __init__.py
    └── image_dataset.py

Implement the _load method with fsspec

Under the hood, a lot of Kedro built-in datasets rely on fsspec to interface with various different data sources in a consistent manner. More information could be found here. In our scenario, it’s particularly convenient to use fsspec in conjunction with Pillow to read an image’s data so that the dataset could work flexibly not just with different image formats but also with different image locations, such as S3, GCP, local filesystems, etc. Below is an implementation of the _load method using fsspec and Pillow to read the data of a single image in a numpy array:

from pathlib import PurePosixPath

from kedro.io import AbstractDataSet
from kedro.io.core import get_filepath_str, get_protocol_and_path

import fsspec
import numpy as np

# PIL is the package from Pillow
from PIL import Image


class ImageDataSet(AbstractDataSet):
    def __init__(self, filepath: str):
        """Creates a new instance of ImageDataSet to load / save image data for given filepath.

        Args:
            filepath: The location of the image file to load / save data.
        """
        # parse the path and protocol (e.g. file, http, s3, etc.)
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._filepath = PurePosixPath(path)
        self._fs = fsspec.filesystem(self._protocol)

    def _load(self) -> np.ndarray:
        """Loads data from the image file.

        Returns:
            Data from the image file as a numpy array
        """
        # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
        load_path = get_filepath_str(self._get_load_path(), self._protocol)
        with self._fs.open(load_path) as f:
            image = Image.open(f).convert('RGBA')
            return np.asarray(image)

To test this out, let’s add a dataset to the data catalog to load Pikachu’s image.

# in conf/base/catalog.yml

pikachu:
  type: kedro_pokemon.io.datasets.image_dataset.ImageDataSet
  filepath: data/01_raw/pokemon-images-and-types/images/images/pikachu.png
  # Note: the duplicated `images` path is part of the original Kaggle dataset

Then launch an IPython session with kedro ipython to preview the data:

# read data image into a numpy array
In [1]: image = context.catalog.load('pikachu')

# then re-show the image using Pillow's Image API.
In [2]: from PIL import Image
In [3]: Image.fromarray(image).show()

Implement the _save method with fsspec

Similarly, we can use implement our _save method as follows:

import numpy as np
from PIL import Image
from kedro.io import AbstractDataSet
from kedro.io.core import get_filepath_str


class ImageDataSet(AbstractDataSet):
    def _save(self, data: np.ndarray) -> None:
        """Saves image data to the specified filepath.
        """
        # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
        save_path = get_filepath_str(self._get_save_path(), self._protocol)
        with self._fs.open(save_path, 'wb') as f:
            image = Image.fromarray(data)
            image.save(f)

Let’s try it out in IPython:

In [1]: image = context.catalog.load('pikachu')
In [2]: context.catalog.save('pikachu', data=image)

You can open the file to verify that the data was written back correctly.

Implement the _describe method

The _describe method is used for printing purposes. The convention in Kedro is for the method to return a dictionary describing the attributes of the dataset .

from kedro.io import AbstractDataSet


class ImageDataSet(AbstractDataSet):
    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset.
        """
        return dict(
            filepath=self._filepath,
            protocol=self._protocol
        )

Bringing it all together

Here is the full implementation of our basic ImageDataSet:

from pathlib import PurePosixPath
from typing import Any, Dict

from kedro.io import AbstractDataSet
from kedro.io.core import get_filepath_str, get_protocol_and_path

import fsspec
import numpy as np
from PIL import Image


class ImageDataSet(AbstractDataSet):
    """``ImageDataSet`` loads / save image data from a given filepath as `numpy` array using Pillow.

    Example:
    ::

        >>> ImageDataSet(filepath='/img/file/path.png')
    """

    def __init__(self, filepath: str):
        """Creates a new instance of ImageDataSet to load / save image data for given filepath.

        Args:
            filepath: The location of the image file to load / save data.
        """
        # parse the path and protocol (e.g. file, http, s3, etc.)
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._filepath = PurePosixPath(path)
        self._fs = fsspec.filesystem(self._protocol)

    def _load(self) -> np.ndarray:
        """Loads data from the image file.

        Returns:
            Data from the image file as a numpy array
        """
        # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
        load_path = get_filepath_str(self._get_load_path(), self._protocol)
        with self._fs.open(load_path, mode="r") as f:
            image = Image.open(f).convert('RGBA')
            return np.asarray(image)

    def _save(self, data: np.ndarray) -> None:
        """Saves image data to the specified filepath.
        """
        # using get_filepath_str ensures that the protocol and path are appended correctly for different filesystems
        save_path = get_filepath_str(self._get_save_path(), self._protocol)
        with self._fs.open(save_path, mode="wb") as f:
            image = Image.fromarray(data)
            image.save(f)

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset.
        """
        return dict(
            filepath=self._filepath,
            protocol=self._protocol
        )

Integrating with PartitionedDataSet

A keen reader might have noticed that up until now our ImageDataSet only works with one single image. What about loading all Pokemon images from the raw data directory for further processing? The answer is using a PartitionDataSet.

PartitionedDataSet is a convenient way to load many separate data files of the same underlying dataset type in a directory. For example, in our Pokemon pipeline, to use PartitionedDataSet with our ImageDataSet to load all Pokemon PNG images, simply add this to the data catalog:

pokemon:
  type: PartitionedDataSet
  dataset: kedro_pokemon.io.datasets.image_dataset.ImageDataSet
  path: data/01_raw/pokemon-images-and-types/images/images
  filename_suffix: ".png"

Let’s try it out in the IPython console:

In [1]: images = context.catalog.load('pokemon')
In [2]: len(images)
Out[2]: 721

Verify the number of .png files in the data directory is indeed 721:

$ ls -la data/01_raw/pokemon-images-and-types/images/images/*.png | wc -l
    721

This proves that the PartitionedDataSet has helped us load all PNG files from the data directory using the underlying ImageDataSet. More importantly, we have enabled this capability just through updating a few lines of YAML. Pretty neat, right?

Adding Versioning

Note: Versioning doesn’t work with PartitionedDataSet. You can’t use both of them at the same time.

To add Versioning support to our dataset, at the minimum, we need to extend the AbstractVersionedDataSet to:

  • Accept a version keyword argument as part of the constructor; and
  • Adapt the _save and _load method to use the versioned data path obtained from _get_save_path and _get_load_path accordingly.

In our example, out of the box, the following implementation will load and save data to and from data/01_raw/pokemon-images-and-types/images/images/pikachu.png/<version>/pikachu.png with version being a datetime-formatted string YYYY-MM-DDThh.mm.ss.sssZ by default:

from pathlib import PurePosixPath
from typing import Any, Dict

from kedro.io import AbstractVersionedDataSet, Version
from kedro.io.core import get_protocol_and_path

import fsspec
import numpy as np
from PIL import Image


class ImageDataSet(AbstractVersionedDataSet):
    """``ImageDataSet`` loads / save image data from a given filepath as `numpy` array using Pillow.

    Example:
    ::

        >>> ImageDataSet(filepath='/img/file/path.png')
    """

    def __init__(self, filepath: str, version: Version = None):
        """Creates a new instance of ImageDataSet to load / save image data for given filepath.

        Args:
            filepath: The location of the image file to load / save data.
            version: The version of the dataset being saved and loaded.
        """
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._fs = fsspec.filesystem(self._protocol)

        super().__init__(
            filepath=PurePosixPath(path),
            version=version,
            exists_function=self._fs.exists,
            glob_function=self._fs.glob,
        )

    def _load(self) -> np.ndarray:
        """Loads data from the image file.

        Returns:
            Data from the image file as a numpy array
        """
        load_path = self._get_load_path()
        with self._fs.open(load_path, mode="r") as f:
            image = Image.open(f).convert('RGBA')
            return np.asarray(image)

    def _save(self, data: np.ndarray) -> None:
        """Saves image data to the specified filepath.
        """
        save_path = self._get_save_path()
        with self._fs.open(save_path, mode="wb") as f:
            image = Image.fromarray(data)
            image.save(f)

    def _describe(self) -> Dict[str, Any]:
        """Returns a dict that describes the attributes of the dataset.
        """
        return dict(
            filepath=self._filepath,
            version=self._version,
            protocol=self._protocol
        )

To test it out, first enable versioning support in our data catalog:

# in conf/base/catalog.yml

pikachu:
  type: kedro_pokemon.io.datasets.image_dataset.ImageDataSet
  filepath: data/01_raw/pokemon-images-and-types/images/images/pikachu.png
  versioned: true
Note: Using an HTTP(S)-based filepath with versioned: true is NOT supported.

And create an initial version of the data by creating a 2020-02-22T00.00.00.000Z directory as an example first version:

$ mv data/01_raw/pokemon-images-and-types/images/images/pikachu.png data/01_raw/pokemon-images-and-types/images/images/pikachu.png.backup
$ mkdir -p data/01_raw/pokemon-images-and-types/images/images/pikachu.png/2020-02-22T00.00.00.000Z/
$ mv data/01_raw/pokemon-images-and-types/images/images/pikachu.png.backup data/01_raw/pokemon-images-and-types/images/images/pikachu.png/2020-02-22T00.00.00.000Z/pikachu.png

The directory structure should look like the following:

data/01_raw/pokemon-images-and-types/images/images/pikachu.png
└── 2020-02-22T00.00.00.000Z/
    └── pikachu.png

Then launch an IPython shell to try loading and saving versioned data:

# loading works as Kedro automatically find the latest available version inside `pikachu.png` directory
In [1]: img = context.catalog.load('pikachu')
# then saving it should work as well
In [2]: context.catalog.save('pikachu', data=img)

If you inspect the content of the data directory, you might notice that a new version of the data was written with the save call. Try calling save a few more times on your dataset and observe the change in the data directory. That’s a versioned dataset in action. Visit here for a more in-depth documentation of the Versioning API.

Thread-safety consideration

Every Kedro dataset should work with both the SequentialRunner as well as the ParallelRunner. Therefore, it must be fully serialisable by Python’s multiprocessing package, i.e. data sets should not make use of lambda functions, nested functions, closures etc. If you are using custom decorators ensure they are using functools.wraps().

To verify whether your dataset is serialisable by multiprocessing, try dumping it using multiprocessing.reduction.ForkingPickler:

dataset = context.catalog._data_sets['pokemon']
from multiprocessing.reduction import ForkingPickler

# the following call shouldn't throw any errors
ForkingPickler.dumps(dataset)

Handling credentials and different filesystems

Kedro allows you to pass credentials as well as filesystem-specific fs_args parameters to your dataset if your use-case requires them. For example, if the Pokémon data reside in an S3 bucket, we can add the credentials and fs_args to the data catalog as follows:

# in conf/base/catalog.yml

pikachu:
  type: kedro_pokemon.io.datasets.image_dataset.ImageDataSet
  filepath: s3://data/01_raw/pokemon-images-and-types/images/images/pikachu.png
  credentials: <your_credentials>
  fs_args:
    arg_1: <value>

These parameters are then passed to the dataset constructor so you can use them accordingly with fsspec:

import fsspec


class ImageDataSet(AbstractDataSet):
    def __init__(
        self,
        filepath: str,
        version: Version = None,
        credentials: Dict[str, Any] = None,
        fs_args: Dict[str, Any] = None,
    ):
        """Creates a new instance of ImageDataSet to load / save image data for given filepath.

            Args:
                filepath: The location of the image file to load / save data.
                version: The version of the dataset being saved and loaded.
                credentials: Credentials required to get access to the underlying filesystem.
                    E.g. for ``GCSFileSystem`` it should look like `{"token": None}`.
                fs_args: Extra arguments to pass into underlying filesystem class.
                    E.g. for ``GCSFileSystem`` class: `{"project": "my-project", ...}`.
        """
        protocol, path = get_protocol_and_path(filepath)
        self._protocol = protocol
        self._fs = fsspec.filesystem(self._protocol, **_credentials, **_fs_args)
    ...
  • For more examples on how to use these parameters through the data catalog’s YAML API, please read this user guide.
  • For an example implementation on how to use these parameters in your dataset’s constructor, please see the SparkDataSet’s implementation.

Contribute your dataset to Kedro

After you are happy with how your new dataset behaves in your project, if you believe that it can benefit other members of the community, please do consider contributing your dataset to Kedro. The process is simple:

  • Add your dataset package to kedro/extras/datasets/. For example, in our ImageDataSet example, the directory structure should be:
kedro/extras/datasets/image
├── __init__.py
└── image_dataset.py
  • If the dataset is complicated, create a README.md file explaining how your dataset works as well as document its API is encouraged.
  • The dataset should be accompanied by full tests coverage, located at tests/extras/datasets accordingly.
  • Make a Pull Request against the develop branch in the kedro repository. For more information, please read our contributing guide.