"""Helper to integrate modular pipelines into a master pipeline."""
import copy
from typing import AbstractSet, Dict, Iterable, List, Set, Union
from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import (
TRANSCODING_SEPARATOR,
Pipeline,
_strip_transcoding,
_transcode_split,
)
_PARAMETER_KEYWORDS = ("params:", "parameters")
[docs]class ModularPipelineError(Exception):
"""Raised when a modular pipeline is not adapted and integrated
appropriately using the helper.
"""
pass
def _is_parameter(name: str) -> bool:
return any(name.startswith(param) for param in _PARAMETER_KEYWORDS)
def _validate_inputs_outputs(
inputs: AbstractSet[str], outputs: AbstractSet[str], pipe: Pipeline
) -> None:
"""Safeguards to ensure that:
- parameters are not specified under inputs
- inputs are only free inputs
- outputs do not contain free inputs
"""
inputs = {_strip_transcoding(k) for k in inputs}
outputs = {_strip_transcoding(k) for k in outputs}
if any(_is_parameter(i) for i in inputs):
raise ModularPipelineError(
"Parameters should be specified in the `parameters` argument"
)
free_inputs = {_strip_transcoding(i) for i in pipe.inputs()}
if not inputs <= free_inputs:
raise ModularPipelineError("Inputs should be free inputs to the pipeline")
if outputs & free_inputs:
raise ModularPipelineError("Outputs can't contain free inputs to the pipeline")
def _validate_datasets_exist(
inputs: AbstractSet[str],
outputs: AbstractSet[str],
parameters: AbstractSet[str],
pipe: Pipeline,
) -> None:
inputs = {_strip_transcoding(k) for k in inputs}
outputs = {_strip_transcoding(k) for k in outputs}
existing = {_strip_transcoding(ds) for ds in pipe.data_sets()}
non_existent = (inputs | outputs | parameters) - existing
if non_existent:
raise ModularPipelineError(
f"Failed to map datasets and/or parameters: "
f"{', '.join(sorted(non_existent))}"
)
[docs]def pipeline(
pipe: Union[Iterable[Union[Node, Pipeline]], Pipeline],
*,
inputs: Union[str, Set[str], Dict[str, str]] = None,
outputs: Union[str, Set[str], Dict[str, str]] = None,
parameters: Dict[str, str] = None,
tags: Union[str, Iterable[str]] = None,
namespace: str = None,
) -> Pipeline:
"""Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``s.
Args:
pipe: The nodes the ``Pipeline`` will be made of. If you
provide pipelines among the list of nodes, those pipelines will
be expanded and all their nodes will become part of this
new pipeline.
inputs: A name or collection of input names to be exposed as connection points
to other pipelines upstream. This is optional; if not provided, the
pipeline inputs are automatically inferred from the pipeline structure.
When str or Set[str] is provided, the listed input names will stay
the same as they are named in the provided pipeline.
When Dict[str, str] is provided, current input names will be
mapped to new names.
Must only refer to the pipeline's free inputs.
outputs: A name or collection of names to be exposed as connection points
to other pipelines downstream. This is optional; if not provided, the
pipeline inputs are automatically inferred from the pipeline structure.
When str or Set[str] is provided, the listed output names will stay
the same as they are named in the provided pipeline.
When Dict[str, str] is provided, current output names will be
mapped to new names.
Can refer to both the pipeline's free outputs, as well as
intermediate results that need to be exposed.
parameters: A map of existing parameter to the new one.
tags: Optional set of tags to be applied to all the pipeline nodes.
namespace: A prefix to give to all dataset names,
except those explicitly named with the `inputs`/`outputs`
arguments, and parameter references (`params:` and `parameters`).
Raises:
ModularPipelineError: When inputs, outputs or parameters are incorrectly
specified, or they do not exist on the original pipeline.
ValueError: When underlying pipeline nodes inputs/outputs are not
any of the expected types (str, dict, list, or None).
Returns:
A new ``Pipeline`` object.
"""
if isinstance(pipe, Pipeline):
# To ensure that we are always dealing with a *copy* of pipe.
pipe = Pipeline([pipe], tags=tags)
else:
pipe = Pipeline(pipe, tags=tags)
if not any([inputs, outputs, parameters, namespace]):
return pipe
# pylint: disable=protected-access
inputs = _to_dict(inputs)
outputs = _to_dict(outputs)
parameters = _to_dict(parameters)
_validate_datasets_exist(inputs.keys(), outputs.keys(), parameters.keys(), pipe)
_validate_inputs_outputs(inputs.keys(), outputs.keys(), pipe)
mapping = {**inputs, **outputs, **parameters}
def _prefix(name: str) -> str:
return f"{namespace}.{name}" if namespace else name
def _is_transcode_base_in_mapping(name: str) -> bool:
base_name, _ = _transcode_split(name)
return base_name in mapping
def _map_transcode_base(name: str):
base_name, transcode_suffix = _transcode_split(name)
return TRANSCODING_SEPARATOR.join((mapping[base_name], transcode_suffix))
def _rename(name: str):
rules = [
# if name mapped to new name, update with new name
(lambda n: n in mapping, lambda n: mapping[n]),
# if it's a parameter, leave as is (don't namespace)
(_is_parameter, lambda n: n),
# if transcode base is mapped to a new name, update with new base
(_is_transcode_base_in_mapping, _map_transcode_base),
# if namespace given, prefix name using that namespace
(lambda n: bool(namespace), _prefix),
]
for predicate, processor in rules:
if predicate(name):
return processor(name)
# leave name as is
return name
def _process_dataset_names(
datasets: Union[None, str, List[str], Dict[str, str]]
) -> Union[None, str, List[str], Dict[str, str]]:
if datasets is None:
return None
if isinstance(datasets, str):
return _rename(datasets)
if isinstance(datasets, list):
return [_rename(name) for name in datasets]
if isinstance(datasets, dict):
return {key: _rename(value) for key, value in datasets.items()}
raise ValueError( # pragma: no cover
f"Unexpected input {datasets} of type {type(datasets)}"
)
def _copy_node(node: Node) -> Node:
new_namespace = node.namespace
if namespace:
new_namespace = (
f"{namespace}.{node.namespace}" if node.namespace else namespace
)
return node._copy(
inputs=_process_dataset_names(node._inputs),
outputs=_process_dataset_names(node._outputs),
namespace=new_namespace,
)
new_nodes = [_copy_node(n) for n in pipe.nodes]
return Pipeline(new_nodes, tags=tags)
def _to_dict(element: Union[None, str, Set[str], Dict[str, str]]) -> Dict[str, str]:
if element is None:
return {}
if isinstance(element, str):
return {element: element}
if isinstance(element, dict):
return copy.deepcopy(element)
return {item: item for item in element}