Source code for kedro.runner.sequential_runner

"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be
used to run the ``Pipeline`` in a sequential manner using a topological sort
of provided nodes.
"""

from collections import Counter
from itertools import chain

from pluggy import PluginManager

from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner, run_node


[docs]class SequentialRunner(AbstractRunner): """``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be used to run the ``Pipeline`` in a sequential manner using a topological sort of provided nodes. """
[docs] def __init__(self, is_async: bool = False): """Instantiates the runner classs. Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. """ super().__init__(is_async=is_async)
[docs] def create_default_data_set(self, ds_name: str) -> AbstractDataSet: """Factory method for creating the default data set for the runner. Args: ds_name: Name of the missing data set Returns: An instance of an implementation of AbstractDataSet to be used for all unregistered data sets. """ return MemoryDataSet()
def _run( self, pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager, session_id: str = None, ) -> None: """The method implementing sequential pipeline running. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. session_id: The id of the session. Raises: Exception: in case of any downstream node failure. """ nodes = pipeline.nodes done_nodes = set() load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) for exec_index, node in enumerate(nodes): try: run_node(node, catalog, hook_manager, self._is_async, session_id) done_nodes.add(node) except Exception: self._suggest_resume_scenario(pipeline, done_nodes) raise # decrement load counts and release any data sets we've finished with for data_set in node.inputs: load_counts[data_set] -= 1 if load_counts[data_set] < 1 and data_set not in pipeline.inputs(): catalog.release(data_set) for data_set in node.outputs: if load_counts[data_set] < 1 and data_set not in pipeline.outputs(): catalog.release(data_set) self._logger.info( "Completed %d out of %d tasks", exec_index + 1, len(nodes) )