"""Helper to integrate modular pipelines into a master pipeline."""
from __future__ import annotations
import copy
import difflib
from typing import AbstractSet, Iterable
from kedro.pipeline.node import Node
from kedro.pipeline.pipeline import Pipeline
from ._transcoding import TRANSCODING_SEPARATOR, _strip_transcoding, _transcode_split
[docs]
class ModularPipelineError(Exception):
"""Raised when a modular pipeline is not adapted and integrated
appropriately using the helper.
"""
pass
def _is_all_parameters(name: str) -> bool:
return name == "parameters"
def _is_single_parameter(name: str) -> bool:
return name.startswith("params:")
def _is_parameter(name: str) -> bool:
return _is_single_parameter(name) or _is_all_parameters(name)
def _validate_inputs_outputs(
inputs: AbstractSet[str], outputs: AbstractSet[str], pipe: Pipeline
) -> None:
"""Safeguards to ensure that:
- parameters are not specified under inputs
- inputs are only free inputs
- outputs do not contain free inputs
"""
inputs = {_strip_transcoding(k) for k in inputs}
outputs = {_strip_transcoding(k) for k in outputs}
if any(_is_parameter(i) for i in inputs):
raise ModularPipelineError(
"Parameters should be specified in the 'parameters' argument"
)
free_inputs = {_strip_transcoding(i) for i in pipe.inputs()}
if not inputs <= free_inputs:
raise ModularPipelineError(
"Inputs must not be outputs from another node in the same pipeline"
)
if outputs & free_inputs:
raise ModularPipelineError(
"All outputs must be generated by some node within the pipeline"
)
def _validate_datasets_exist(
inputs: AbstractSet[str],
outputs: AbstractSet[str],
parameters: AbstractSet[str],
pipe: Pipeline,
) -> None:
"""Validate that inputs, parameters and outputs map correctly onto the provided nodes."""
inputs = {_strip_transcoding(k) for k in inputs}
outputs = {_strip_transcoding(k) for k in outputs}
existing = {_strip_transcoding(ds) for ds in pipe.datasets()}
non_existent = (inputs | outputs | parameters) - existing
if non_existent:
sorted_non_existent = sorted(non_existent)
possible_matches = []
for non_existent_input in sorted_non_existent:
possible_matches += difflib.get_close_matches(non_existent_input, existing)
error_msg = f"Failed to map datasets and/or parameters onto the nodes provided: {', '.join(sorted_non_existent)}"
suggestions = (
f" - did you mean one of these instead: {', '.join(possible_matches)}"
if possible_matches
else ""
)
raise ModularPipelineError(error_msg + suggestions)
def _get_dataset_names_mapping(
names: str | set[str] | dict[str, str] | None = None,
) -> dict[str, str]:
"""Take a name or a collection of dataset names
and turn it into a mapping from the old dataset names to the provided ones if necessary.
Args:
names: A dataset name or collection of dataset names.
When str or set[str] is provided, the listed names will stay
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current names will be
mapped to new names in the resultant pipeline.
Returns:
A dictionary that maps the old dataset names to the provided ones.
Examples:
>>> _get_dataset_names_mapping("dataset_name")
{"dataset_name": "dataset_name"} # a str name will stay the same
>>> _get_dataset_names_mapping(set(["ds_1", "ds_2"]))
{"ds_1": "ds_1", "ds_2": "ds_2"} # a set[str] of names will stay the same
>>> _get_dataset_names_mapping({"ds_1": "new_ds_1_name"})
{"ds_1": "new_ds_1_name"} # a dict[str, str] of names will map key to value
"""
if names is None:
return {}
if isinstance(names, str):
return {names: names}
if isinstance(names, dict):
return copy.deepcopy(names)
return {item: item for item in names}
def _normalize_param_name(name: str) -> str:
"""Make sure that a param name has a `params:` prefix before passing to the node"""
return name if name.startswith("params:") else f"params:{name}"
def _get_param_names_mapping(
names: str | set[str] | dict[str, str] | None = None,
) -> dict[str, str]:
"""Take a parameter or a collection of parameter names
and turn it into a mapping from existing parameter names to new ones if necessary.
It follows the same rule as `_get_dataset_names_mapping` and
prefixes the keys on the resultant dictionary with `params:` to comply with node's syntax.
Args:
names: A parameter name or collection of parameter names.
When str or set[str] is provided, the listed names will stay
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current names will be
mapped to new names in the resultant pipeline.
Returns:
A dictionary that maps the old parameter names to the provided ones.
Examples:
>>> _get_param_names_mapping("param_name")
{"params:param_name": "params:param_name"} # a str name will stay the same
>>> _get_param_names_mapping(set(["param_1", "param_2"]))
# a set[str] of names will stay the same
{"params:param_1": "params:param_1", "params:param_2": "params:param_2"}
>>> _get_param_names_mapping({"param_1": "new_name_for_param_1"})
# a dict[str, str] of names will map key to valu
{"params:param_1": "params:new_name_for_param_1"}
"""
params = {}
for name, new_name in _get_dataset_names_mapping(names).items():
if _is_all_parameters(name):
params[name] = name # don't map parameters into params:parameters
else:
param_name = _normalize_param_name(name)
param_new_name = _normalize_param_name(new_name)
params[param_name] = param_new_name
return params
[docs]
def pipeline( # noqa: PLR0913
pipe: Iterable[Node | Pipeline] | Pipeline,
*,
inputs: str | set[str] | dict[str, str] | None = None,
outputs: str | set[str] | dict[str, str] | None = None,
parameters: str | set[str] | dict[str, str] | None = None,
tags: str | Iterable[str] | None = None,
namespace: str | None = None,
) -> Pipeline:
r"""Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``\s.
Args:
pipe: The nodes the ``Pipeline`` will be made of. If you
provide pipelines among the list of nodes, those pipelines will
be expanded and all their nodes will become part of this
new pipeline.
inputs: A name or collection of input names to be exposed as connection points
to other pipelines upstream. This is optional; if not provided, the
pipeline inputs are automatically inferred from the pipeline structure.
When str or set[str] is provided, the listed input names will stay
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current input names will be
mapped to new names.
Must only refer to the pipeline's free inputs.
outputs: A name or collection of names to be exposed as connection points
to other pipelines downstream. This is optional; if not provided, the
pipeline inputs are automatically inferred from the pipeline structure.
When str or set[str] is provided, the listed output names will stay
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current output names will be
mapped to new names.
Can refer to both the pipeline's free outputs, as well as
intermediate results that need to be exposed.
parameters: A name or collection of parameters to namespace.
When str or set[str] are provided, the listed parameter names will stay
the same as they are named in the provided pipeline.
When dict[str, str] is provided, current parameter names will be
mapped to new names.
The parameters can be specified without the `params:` prefix.
tags: Optional set of tags to be applied to all the pipeline nodes.
namespace: A prefix to give to all dataset names,
except those explicitly named with the `inputs`/`outputs`
arguments, and parameter references (`params:` and `parameters`).
Raises:
ModularPipelineError: When inputs, outputs or parameters are incorrectly
specified, or they do not exist on the original pipeline.
ValueError: When underlying pipeline nodes inputs/outputs are not
any of the expected types (str, dict, list, or None).
Returns:
A new ``Pipeline`` object.
"""
if isinstance(pipe, Pipeline):
# To ensure that we are always dealing with a *copy* of pipe.
pipe = Pipeline([pipe], tags=tags)
else:
pipe = Pipeline(pipe, tags=tags)
if not any([inputs, outputs, parameters, namespace]):
return pipe
# noqa: protected-access
inputs = _get_dataset_names_mapping(inputs)
outputs = _get_dataset_names_mapping(outputs)
parameters = _get_param_names_mapping(parameters)
_validate_datasets_exist(inputs.keys(), outputs.keys(), parameters.keys(), pipe)
_validate_inputs_outputs(inputs.keys(), outputs.keys(), pipe)
mapping = {**inputs, **outputs, **parameters}
def _prefix_dataset(name: str) -> str:
return f"{namespace}.{name}"
def _prefix_param(name: str) -> str:
_, param_name = name.split("params:")
return f"params:{namespace}.{param_name}"
def _is_transcode_base_in_mapping(name: str) -> bool:
base_name, _ = _transcode_split(name)
return base_name in mapping
def _map_transcode_base(name: str) -> str:
base_name, transcode_suffix = _transcode_split(name)
return TRANSCODING_SEPARATOR.join((mapping[base_name], transcode_suffix))
def _rename(name: str) -> str:
rules = [
# if name mapped to new name, update with new name
(lambda n: n in mapping, lambda n: mapping[n]),
# if name refers to the set of all "parameters", leave as is
(_is_all_parameters, lambda n: n),
# if transcode base is mapped to a new name, update with new base
(_is_transcode_base_in_mapping, _map_transcode_base),
# if name refers to a single parameter and a namespace is given, apply prefix
(lambda n: bool(namespace) and _is_single_parameter(n), _prefix_param),
# if namespace given for a dataset, prefix name using that namespace
(lambda n: bool(namespace), _prefix_dataset),
]
for predicate, processor in rules:
if predicate(name): # type: ignore[no-untyped-call]
processor_name: str = processor(name) # type: ignore[no-untyped-call]
return processor_name
# leave name as is
return name
def _process_dataset_names(
datasets: str | list[str] | dict[str, str] | None,
) -> str | list[str] | dict[str, str] | None:
if datasets is None:
return None
if isinstance(datasets, str):
return _rename(datasets)
if isinstance(datasets, list):
return [_rename(name) for name in datasets]
if isinstance(datasets, dict):
return {key: _rename(value) for key, value in datasets.items()}
raise ValueError( # pragma: no cover
f"Unexpected input {datasets} of type {type(datasets)}"
)
def _copy_node(node: Node) -> Node:
new_namespace = node.namespace
if namespace:
new_namespace = (
f"{namespace}.{node.namespace}" if node.namespace else namespace
)
node_copy: Node = node._copy(
inputs=_process_dataset_names(node._inputs),
outputs=_process_dataset_names(node._outputs),
namespace=new_namespace,
)
return node_copy
new_nodes = [_copy_node(n) for n in pipe.nodes]
return Pipeline(new_nodes, tags=tags)