kedro.pipeline.Pipeline

class kedro.pipeline.Pipeline(nodes, *, name=None, tags=None)[source]

Bases: object

A Pipeline defined as a collection of Node objects. This class treats nodes as part of a graph representation and provides inputs, outputs and execution order.

Attributes

Pipeline.grouped_nodes Return a list of the pipeline nodes in topologically ordered groups, i.e.
Pipeline.name (DEPRECATED, use Pipeline.tag method instead) Get the pipeline name.
Pipeline.node_dependencies All dependencies of nodes where the first Node has a direct dependency on the second Node.
Pipeline.nodes Return a list of the pipeline nodes in topological order, i.e.

Methods

Pipeline.__init__(nodes, *[, name, tags]) Initialise Pipeline with a list of Node instances.
Pipeline.all_inputs() All inputs for all nodes in the pipeline.
Pipeline.all_outputs() All outputs of all nodes in the pipeline.
Pipeline.data_sets() The names of all data sets used by the Pipeline, including inputs and outputs.
Pipeline.decorate(*decorators) Create a new Pipeline by applying the provided decorators to all the nodes in the pipeline.
Pipeline.describe([names_only]) Obtain the order of execution and expected free input variables in a loggable pre-formatted string.
Pipeline.from_inputs(*inputs) Create a new Pipeline object with the nodes which depend directly or transitively on the provided inputs.
Pipeline.from_nodes(*node_names) Create a new Pipeline object with the nodes which depend directly or transitively on the provided nodes.
Pipeline.inputs() The names of free inputs that must be provided at runtime so that the pipeline is runnable.
Pipeline.only_nodes(*node_names) Create a new Pipeline which will contain only the specified nodes by name.
Pipeline.only_nodes_with_inputs(*inputs) Create a new Pipeline object with the nodes which depend directly on the provided inputs.
Pipeline.only_nodes_with_outputs(*outputs) Create a new Pipeline object with the nodes which are directly required to produce the provided outputs.
Pipeline.only_nodes_with_tags(*tags) Create a new Pipeline object with the nodes which contain any of the provided tags.
Pipeline.outputs() The names of outputs produced when the whole pipeline is run.
Pipeline.tag(tags) Return a copy of the pipeline, with each node tagged accordingly.
Pipeline.to_json() Return a json representation of the pipeline.
Pipeline.to_nodes(*node_names) Create a new Pipeline object with the nodes required directly or transitively by the provided nodes.
Pipeline.to_outputs(*outputs) Create a new Pipeline object with the nodes which are directly or transitively required to produce the provided outputs.
Pipeline.transform([datasets, prefix]) Create a copy of the pipeline and its nodes, with some dataset names modified.
__init__(nodes, *, name=None, tags=None)[source]

Initialise Pipeline with a list of Node instances.

Parameters:
  • nodes (Iterable[Union[Node, Pipeline]]) – The list of nodes the Pipeline will be made of. If you provide pipelines among the list of nodes, those pipelines will be expanded and all their nodes will become part of this new pipeline.
  • name (Optional[str]) – (DEPRECATED, use tags method instead) The name of the pipeline. If specified, this name will be used to tag all of the nodes in the pipeline.
  • tags (Union[str, Iterable[str], None]) – Optional set of tags to be applied to all the pipeline nodes.
Raises:
  • ValueError – When an empty list of nodes is provided, or when not all nodes have unique names.
  • CircularDependencyError – When visiting all the nodes is not possible due to the existence of a circular dependency.
  • OutputNotUniqueError – When multiple Node instances produce the same output.
  • ConfirmNotUniqueError – When multiple Node instances attempt to confirm the same dataset.

Example:

from kedro.pipeline import Pipeline
from kedro.pipeline import node

# In the following scenario first_ds and second_ds
# are data sets provided by io. Pipeline will pass these
# data sets to first_node function and provides the result
# to the second_node as input.

def first_node(first_ds, second_ds):
    return dict(third_ds=first_ds+second_ds)

def second_node(third_ds):
    return third_ds

pipeline = Pipeline([
    node(first_node, ['first_ds', 'second_ds'], ['third_ds']),
    node(second_node, dict(third_ds='third_ds'), 'fourth_ds')])

pipeline.describe()
all_inputs()[source]

All inputs for all nodes in the pipeline.

Return type:Set[str]
Returns:All node input names as a Set.
all_outputs()[source]

All outputs of all nodes in the pipeline.

Return type:Set[str]
Returns:All node outputs.
data_sets()[source]

The names of all data sets used by the Pipeline, including inputs and outputs.

Return type:Set[str]
Returns:The set of all pipeline data sets.
decorate(*decorators)[source]

Create a new Pipeline by applying the provided decorators to all the nodes in the pipeline. If no decorators are passed, it will return a copy of the current Pipeline object.

Parameters:
  • decorators (Callable) – Decorators to be applied on all node functions in
  • pipeline. Decorators will be applied from right to left. (the) –
Return type:

Pipeline

Returns:

A new Pipeline object with all nodes decorated with the provided decorators.

describe(names_only=True)[source]

Obtain the order of execution and expected free input variables in a loggable pre-formatted string. The order of nodes matches the order of execution given by the topological sort.

Parameters:names_only (bool) – The flag to describe names_only pipeline with just node names.

Example:

pipeline = Pipeline([ ... ])

logger = logging.getLogger(__name__)

logger.info(pipeline.describe())

After invocation the following will be printed as an info level log statement:

#### Pipeline execution order ####
Inputs: C, D

func1([C]) -> [A]
func2([D]) -> [B]
func3([A, D]) -> [E]

Outputs: B, E
##################################
Return type:str
Returns:The pipeline description as a formatted string.
from_inputs(*inputs)[source]

Create a new Pipeline object with the nodes which depend directly or transitively on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters:inputs (str) – A list of inputs which should be used as a starting point of the new Pipeline
Raises:ValueError – Raised when any of the given inputs do not exist in the Pipeline object.
Return type:Pipeline
Returns:
A new Pipeline object, containing a subset of the
nodes of the current one such that only nodes depending directly or transitively on the provided inputs are being copied.
from_nodes(*node_names)[source]

Create a new Pipeline object with the nodes which depend directly or transitively on the provided nodes.

Parameters:node_names (str) – A list of node_names which should be used as a starting point of the new Pipeline.
Raises:ValueError – Raised when any of the given names do not exist in the Pipeline object.
Return type:Pipeline
Returns:
A new Pipeline object, containing a subset of the nodes of
the current one such that only nodes depending directly or transitively on the provided nodes are being copied.
grouped_nodes

Return a list of the pipeline nodes in topologically ordered groups, i.e. if node A needs to be run before node B, it will appear in an earlier group.

Return type:List[Set[Node]]
Returns:The pipeline nodes in topologically ordered groups.
inputs()[source]

The names of free inputs that must be provided at runtime so that the pipeline is runnable. Does not include intermediate inputs which are produced and consumed by the inner pipeline nodes. Resolves transcoded names where necessary.

Return type:Set[str]
Returns:The set of free input names needed by the pipeline.
name

(DEPRECATED, use Pipeline.tag method instead) Get the pipeline name.

Return type:Optional[str]
Returns:The name of the pipeline as provided in the constructor.
node_dependencies

All dependencies of nodes where the first Node has a direct dependency on the second Node.

Return type:Dict[Node, Set[Node]]
Returns:Dictionary where keys are nodes and values are sets made up of their parent nodes. Independent nodes have this as empty sets.
nodes

Return a list of the pipeline nodes in topological order, i.e. if node A needs to be run before node B, it will appear earlier in the list.

Return type:List[Node]
Returns:The list of all pipeline nodes in topological order.
only_nodes(*node_names)[source]

Create a new Pipeline which will contain only the specified nodes by name.

Parameters:node_names (str) – One or more node names. The returned Pipeline will only contain these nodes.
Raises:ValueError – When some invalid node name is given.
Return type:Pipeline
Returns:A new Pipeline, containing only nodes.
only_nodes_with_inputs(*inputs)[source]

Create a new Pipeline object with the nodes which depend directly on the provided inputs. If provided a name, but no format, for a transcoded input, it includes all the nodes that use inputs with that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters:inputs (str) – A list of inputs which should be used as a starting point of the new Pipeline.
Raises:ValueError – Raised when any of the given inputs do not exist in the Pipeline object.
Return type:Pipeline
Returns:
A new Pipeline object, containing a subset of the
nodes of the current one such that only nodes depending directly on the provided inputs are being copied.
only_nodes_with_outputs(*outputs)[source]

Create a new Pipeline object with the nodes which are directly required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters:outputs (str) – A list of outputs which should be the final outputs of the new Pipeline.
Raises:ValueError – Raised when any of the given outputs do not exist in the Pipeline object.
Return type:Pipeline
Returns:A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly required to produce the provided outputs are being copied.
only_nodes_with_tags(*tags)[source]

Create a new Pipeline object with the nodes which contain any of the provided tags. The resulting Pipeline is empty if no tags are provided.

Parameters:tags (str) – A list of node tags which should be used to lookup the nodes of the new Pipeline.
Returns:
A new Pipeline object, containing a subset of the
nodes of the current one such that only nodes containing any of the tags provided are being copied.
Return type:Pipeline
outputs()[source]

The names of outputs produced when the whole pipeline is run. Does not include intermediate outputs that are consumed by other pipeline nodes. Resolves transcoded names where necessary.

Return type:Set[str]
Returns:The set of final pipeline outputs.
tag(tags)[source]

Return a copy of the pipeline, with each node tagged accordingly. :type tags: Union[str, Iterable[str]] :param tags: The tags to be added to the nodes. :rtype: Pipeline :return: New Pipeline object.

to_json()[source]

Return a json representation of the pipeline.

to_nodes(*node_names)[source]

Create a new Pipeline object with the nodes required directly or transitively by the provided nodes.

Parameters:node_names (str) – A list of node_names which should be used as an end point of the new Pipeline.
Raises:ValueError – Raised when any of the given names do not exist in the Pipeline object.
Return type:Pipeline
Returns:
A new Pipeline object, containing a subset of the nodes of the
current one such that only nodes required directly or transitively by the provided nodes are being copied.
to_outputs(*outputs)[source]

Create a new Pipeline object with the nodes which are directly or transitively required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it includes all the nodes that output to that name, otherwise it matches to the fully-qualified name only (i.e. name@format).

Parameters:outputs (str) – A list of outputs which should be the final outputs of the new Pipeline.
Raises:ValueError – Raised when any of the given outputs do not exist in the Pipeline object.
Return type:Pipeline
Returns:A new Pipeline object, containing a subset of the nodes of the current one such that only nodes which are directly or transitively required to produce the provided outputs are being copied.
transform(datasets=None, prefix=None)[source]

Create a copy of the pipeline and its nodes, with some dataset names modified.

Parameters:
  • datasets (Optional[Dict[str, str]]) – A map of the existing dataset name to the new one. Both input and output datasets can be replaced this way.
  • prefix (Optional[str]) – A prefix to give to all dataset names, except those explicitly named with the datasets parameter, and parameter references (params: and parameters).
Raises:

ValueError – invalid dataset names are given.

Return type:

Pipeline

Returns:

A new Pipeline object with the new nodes, modified as requested.