Source code for kedro.runner.sequential_runner

"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be
used to run the ``Pipeline`` in a sequential manner using a topological sort
of provided nodes.
"""

from collections import Counter
from itertools import chain

from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner, run_node


[docs]class SequentialRunner(AbstractRunner): """``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be used to run the ``Pipeline`` in a sequential manner using a topological sort of provided nodes. """
[docs] def __init__(self, is_async: bool = False): """Instantiates the runner classs. Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. """ super().__init__(is_async=is_async)
[docs] def create_default_data_set(self, ds_name: str) -> AbstractDataSet: """Factory method for creating the default data set for the runner. Args: ds_name: Name of the missing data set Returns: An instance of an implementation of AbstractDataSet to be used for all unregistered data sets. """ return MemoryDataSet()
def _run( self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None ) -> None: """The method implementing sequential pipeline running. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. run_id: The id of the run. Raises: Exception: in case of any downstream node failure. """ nodes = pipeline.nodes done_nodes = set() load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) for exec_index, node in enumerate(nodes): try: run_node(node, catalog, self._is_async, run_id) done_nodes.add(node) except Exception: self._suggest_resume_scenario(pipeline, done_nodes) raise # decrement load counts and release any data sets we've finished with for data_set in node.inputs: load_counts[data_set] -= 1 if load_counts[data_set] < 1 and data_set not in pipeline.inputs(): catalog.release(data_set) for data_set in node.outputs: if load_counts[data_set] < 1 and data_set not in pipeline.outputs(): catalog.release(data_set) self._logger.info( "Completed %d out of %d tasks", exec_index + 1, len(nodes) )