Source code for kedro.runner.sequential_runner

# Copyright 2020 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
# or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.
"""``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be
used to run the ``Pipeline`` in a sequential manner using a topological sort
of provided nodes.
"""

from collections import Counter
from itertools import chain

from kedro.io import AbstractDataSet, DataCatalog, MemoryDataSet
from kedro.pipeline import Pipeline
from kedro.runner.runner import AbstractRunner, run_node


[docs]class SequentialRunner(AbstractRunner): """``SequentialRunner`` is an ``AbstractRunner`` implementation. It can be used to run the ``Pipeline`` in a sequential manner using a topological sort of provided nodes. """
[docs] def __init__(self, is_async: bool = False): """Instantiates the runner classs. Args: is_async: If True, the node inputs and outputs are loaded and saved asynchronously with threads. Defaults to False. """ super().__init__(is_async=is_async)
[docs] def create_default_data_set(self, ds_name: str) -> AbstractDataSet: """Factory method for creating the default data set for the runner. Args: ds_name: Name of the missing data set Returns: An instance of an implementation of AbstractDataSet to be used for all unregistered data sets. """ return MemoryDataSet()
def _run( self, pipeline: Pipeline, catalog: DataCatalog, run_id: str = None ) -> None: """The method implementing sequential pipeline running. Args: pipeline: The ``Pipeline`` to run. catalog: The ``DataCatalog`` from which to fetch data. run_id: The id of the run. Raises: Exception: in case of any downstream node failure. """ nodes = pipeline.nodes done_nodes = set() load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) for exec_index, node in enumerate(nodes): try: run_node(node, catalog, self._is_async, run_id) done_nodes.add(node) except Exception: self._suggest_resume_scenario(pipeline, done_nodes) raise # decrement load counts and release any data sets we've finished with for data_set in node.inputs: load_counts[data_set] -= 1 if load_counts[data_set] < 1 and data_set not in pipeline.inputs(): catalog.release(data_set) for data_set in node.outputs: if load_counts[data_set] < 1 and data_set not in pipeline.outputs(): catalog.release(data_set) self._logger.info( "Completed %d out of %d tasks", exec_index + 1, len(nodes) )