Source code for kedro.io.cached_dataset

"""
This module contains ``CachedDataSet``, a dataset wrapper which caches in memory the data saved,
so that the user avoids io operations with slow storage media
"""
import logging
from typing import Any, Dict, Union

from kedro.io.core import VERSIONED_FLAG_KEY, AbstractDataSet, Version
from kedro.io.memory_dataset import MemoryDataSet


[docs]class CachedDataSet(AbstractDataSet): """``CachedDataSet`` is a dataset wrapper which caches in memory the data saved, so that the user avoids io operations with slow storage media. You can also specify a ``CachedDataSet`` in catalog.yml: :: >>> test_ds: >>> type: CachedDataSet >>> versioned: true >>> dataset: >>> type: pandas.CSVDataSet >>> filepath: example.csv Please note that if your dataset is versioned, this should be indicated in the wrapper class as shown above. """
[docs] def __init__( self, dataset: Union[AbstractDataSet, Dict], version: Version = None, copy_mode: str = None, ): """Creates a new instance of ``CachedDataSet`` pointing to the provided Python object. Args: dataset: A Kedro DataSet object or a dictionary to cache. version: If specified, should be an instance of ``kedro.io.core.Version``. If its ``load`` attribute is None, the latest version will be loaded. If its ``save`` attribute is None, save version will be autogenerated. copy_mode: The copy mode used to copy the data. Possible values are: "deepcopy", "copy" and "assign". If not provided, it is inferred based on the data type. Raises: ValueError: If the provided dataset is not a valid dict/YAML representation of a dataset or an actual dataset. """ if isinstance(dataset, dict): self._dataset = self._from_config(dataset, version) elif isinstance(dataset, AbstractDataSet): self._dataset = dataset else: raise ValueError( "The argument type of `dataset` should be either a dict/YAML " "representation of the dataset, or the actual dataset object." ) self._cache = MemoryDataSet(copy_mode=copy_mode)
def _release(self) -> None: self._cache.release() self._dataset.release() @staticmethod def _from_config(config, version): if VERSIONED_FLAG_KEY in config: raise ValueError( "Cached datasets should specify that they are versioned in the " "`CachedDataSet`, not in the wrapped dataset." ) if version: config[VERSIONED_FLAG_KEY] = True return AbstractDataSet.from_config( "_cached", config, version.load, version.save ) return AbstractDataSet.from_config("_cached", config) def _describe(self) -> Dict[str, Any]: return { "dataset": self._dataset._describe(), # pylint: disable=protected-access "cache": self._cache._describe(), # pylint: disable=protected-access } def _load(self): data = self._cache.load() if self._cache.exists() else self._dataset.load() if not self._cache.exists(): self._cache.save(data) return data def _save(self, data: Any) -> None: self._dataset.save(data) self._cache.save(data) def _exists(self) -> bool: return self._cache.exists() or self._dataset.exists() def __getstate__(self): # clearing the cache can be prevented by modifying # how parallel runner handles datasets (not trivial!) logging.getLogger(__name__).warning("%s: clearing cache to pickle.", str(self)) self._cache.release() return self.__dict__