Dataset transformers (deprecated)¶
Warning
The transformer API will be deprecated in 0.18.0. We recommend using the before_dataset_loaded
/after_dataset_loaded
and before_dataset_saved
/after_dataset_saved
Hooks to customise the dataset load
and save
methods where appropriate.
As we describe in the documentation about how Kedro works with data, Kedro transformers intercept the load and save operations on Kedro DataSet
s.
Use cases for Kedro transformers include:
Data validation
Operation performance tracking
Data format conversion (although we would recommend Transcoding for this)
Develop your own dataset transformer¶
To illustrate the use case for operation performance tracking, this section demonstrates how to build a transformer to track memory consumption. In fact, Kedro provides a built-in memory profiler, but this example shows how to build your own, using memory-profiler.
Note
To work with this example, you need to pip install memory_profiler
before you start.
A custom transformer should:
Inherit from the
kedro.io.AbstractTransformer
base classImplement the
load
andsave
method
Within the project in which you want to use the transformer, create a file in src/<package_name>/
called memory_profile.py
and paste the following code into it:
Click to expand
import logging
from typing import Callable, Any
from kedro.io import AbstractTransformer
from memory_profiler import memory_usage
def _normalise_mem_usage(mem_usage):
# memory_profiler < 0.56.0 returns list instead of float
return mem_usage[0] if isinstance(mem_usage, (list, tuple)) else mem_usage
class ProfileMemoryTransformer(AbstractTransformer):
"""A transformer that logs the maximum memory consumption during load and save calls"""
@property
def _logger(self):
return logging.getLogger(self.__class__.__name__)
def load(self, data_set_name: str, load: Callable[[], Any]) -> Any:
mem_usage, data = memory_usage(
(load, [], {}),
interval=0.1,
max_usage=True,
retval=True,
include_children=True,
)
# memory_profiler < 0.56.0 returns list instead of float
mem_usage = _normalise_mem_usage(mem_usage)
self._logger.info(
"Loading %s consumed %2.2fMiB memory at peak time", data_set_name, mem_usage
)
return data
def save(self, data_set_name: str, save: Callable[[Any], None], data: Any) -> None:
mem_usage = memory_usage(
(save, [data], {}),
interval=0.1,
max_usage=True,
retval=False,
include_children=True,
)
mem_usage = _normalise_mem_usage(mem_usage)
self._logger.info(
"Saving %s consumed %2.2fMiB memory at peak time", data_set_name, mem_usage
)
Next, you need to update TransformerHooks
to apply your custom transformer. Add the following to a hooks.py
file in your project.
Click to expand
...
from .memory_profile import ProfileMemoryTransformer # new import
class TransformerHooks:
@hook_impl
def after_catalog_created(self, catalog: DataCatalog) -> None:
catalog.add_transformer(ProfileTimeTransformer())
# as memory tracking is quite time-consuming, for demonstration purposes
# let's apply profile_memory only to the model_input_table
catalog.add_transformer(ProfileMemoryTransformer(), "model_input_table")
Finally, update HOOKS
variable in settings.py
as follows:
HOOKS = (TransformerHooks(),)
Then re-run the pipeline:
$ kedro run
The output should look similar to the following:
...
2019-11-13 15:55:01,674 - kedro.io.data_catalog - INFO - Saving data to `model_input_table` (CSVDataSet)...
2019-11-13 15:55:12,322 - ProfileMemoryTransformer - INFO - Saving model_input_table consumed 606.98MiB memory at peak time
2019-11-13 15:55:12,322 - ProfileTimeTransformer - INFO - Saving model_input_table took 10.648 seconds
2019-11-13 15:55:12,357 - kedro.runner.sequential_runner - INFO - Completed 3 out of 6 tasks
2019-11-13 15:55:12,358 - kedro.io.data_catalog - INFO - Loading data from `model_input_table` (CSVDataSet)...
2019-11-13 15:55:13,933 - ProfileMemoryTransformer - INFO - Loading model_input_table consumed 533.05MiB memory at peak time
2019-11-13 15:55:13,933 - ProfileTimeTransformer - INFO - Loading model_input_table took 1.576 seconds
...