Deployment with Prefect

This page explains how to run your Kedro pipeline using Prefect Core, an open-source workflow management system.

In scope of this deployment, we are interested in Prefect Server, an open-source backend that makes it easy to monitor and execute your Prefect flows and automatically extends the Prefect Core.

Note

Prefect Server ships out-of-the-box with a fully featured user interface.

Prerequisites

To use Prefect Core and Prefect Server, make sure you have the following prerequisites in place:

  • Prefect Core is installed on your machine

  • Docker and Docker Compose are installed and Docker Engine is running

  • Prefect Server is up and running

  • PREFECT__LOGGING__EXTRA_LOGGERS environment variable is set (it is required to get Kedro logs emitted):

export PREFECT__LOGGING__EXTRA_LOGGERS="['kedro']"

How to run your Kedro pipeline using Prefect

Convert your Kedro pipeline to Prefect flow

To build a Prefect flow for your Kedro pipeline programmatically and register it with the Prefect API, use the following Python script, which should be stored in your project’s root directory:

# <project_root>/register_prefect_flow.py
from pathlib import Path

import click

from prefect import Client, Flow, Task
from prefect.exceptions import ClientError

from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline.node import Node
from kedro.runner import run_node


class KedroTask(Task):
    """Kedro node as a Prefect task."""

    def __init__(self, node: Node, catalog: DataCatalog) -> None:
        self._node = node
        self._catalog = catalog
        super().__init__(name=node.name, tags=node.tags)

    def run(self):
        run_node(self._node, self._catalog)


@click.command()
@click.option("-p", "--pipeline", "pipeline_name", default=None)
@click.option("--env", "-e", type=str, default=None)
def build_and_register_flow(pipeline_name, env):
    """Register a Kedro pipeline as a Prefect flow."""
    project_path = Path.cwd()
    metadata = bootstrap_project(project_path)

    session = KedroSession.create(project_path=project_path, env=env)
    context = session.load_context()

    catalog = context.catalog
    pipeline_name = pipeline_name or "__default__"
    pipeline = pipelines.get(pipeline_name)

    unregistered_ds = pipeline.data_sets() - set(catalog.list())
    for ds_name in unregistered_ds:
        catalog.add(ds_name, MemoryDataSet())

    flow = Flow(metadata.project_name)

    tasks = {}
    for node, parent_nodes in pipeline.node_dependencies.items():
        if node._unique_key not in tasks:
            node_task = KedroTask(node, catalog)
            tasks[node._unique_key] = node_task
        else:
            node_task = tasks[node._unique_key]

        parent_tasks = []

        for parent in parent_nodes:
            if parent._unique_key not in tasks:
                parent_task = KedroTask(parent, catalog)
                tasks[parent._unique_key] = parent_task
            else:
                parent_task = tasks[parent._unique_key]

            parent_tasks.append(parent_task)

        flow.set_dependencies(task=node_task, upstream_tasks=parent_tasks)

    client = Client()
    try:
        client.create_project(project_name=metadata.project_name)
    except ClientError:
        # `metadata.project_name` project already exists
        pass

    # Register the flow with the server
    flow.register(project_name=metadata.project_name)

    # Start a local agent that can communicate between the server
    # and your flow code
    flow.run_agent()


if __name__ == "__main__":
    build_and_register_flow()

Note

The script launches a local agent. Remember to stop the agent with Ctrl-C when you complete.

Run Prefect flow

Now, having the flow registered, you can use Prefect UI to orchestrate and monitor it.

Navigate to http://localhost:8080/default?flows= to see your registered flow.

../_images/prefect_flows.png

Click on the flow to open it and then trigger your flow using the “RUN”/”QUICK RUN” button.

../_images/prefect_flow_details.png