"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be
used to run the ``Pipeline`` in a sequential manner using a topological sort
of provided nodes.
"""
from collections import Counter
from itertools import chain
from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner, run_node
[docs]class SequentialRunner(AbstractRunner):
"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can
be used to run the ``Pipeline`` in a sequential manner using a
topological sort of provided nodes.
"""
[docs] def __init__(self, is_async: bool = False):
"""Instantiates the runner classs.
Args:
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
"""
super().__init__(is_async=is_async)
[docs] def create_default_data_set(self, ds_name: str) -> AbstractDataSet:
"""Factory method for creating the default data set for the runner.
Args:
ds_name: Name of the missing data set
Returns:
An instance of an implementation of AbstractDataSet to be used
for all unregistered data sets.
"""
return MemoryDataSet()
def _run(
self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None
) -> None:
"""The method implementing sequential pipeline running.
Args:
pipeline: The ``Pipeline`` to run.
catalog: The ``DataCatalog`` from which to fetch data.
run_id: The id of the run.
Raises:
Exception: in case of any downstream node failure.
"""
nodes = pipeline.nodes
done_nodes = set()
load_counts = Counter(chain.from_iterable(n.inputs for n in nodes))
for exec_index, node in enumerate(nodes):
try:
run_node(node, catalog, self._is_async, run_id)
done_nodes.add(node)
except Exception:
self._suggest_resume_scenario(pipeline, done_nodes)
raise
# decrement load counts and release any data sets we've finished with
for data_set in node.inputs:
load_counts[data_set] -= 1
if load_counts[data_set] < 1 and data_set not in pipeline.inputs():
catalog.release(data_set)
for data_set in node.outputs:
if load_counts[data_set] < 1 and data_set not in pipeline.outputs():
catalog.release(data_set)
self._logger.info(
"Completed %d out of %d tasks", exec_index + 1, len(nodes)
)