Deployment with Prefect¶
This page explains how to run your Kedro pipeline using Prefect Core, an open source workflow management system.
In scope of this deployment we are interested in Prefect Server which is an open-source backend that makes it easy to monitor and execute your Prefect flows and automatically extends the Prefect Core.
Note
Prefect Server ships out-of-the-box with a fully featured user interface.
Prerequisites¶
To use Prefect Core and Prefect Server, make sure you have the following prerequisites in place:
Prefect Core is installed on your machine
Docker and Docker Compose are installed and Docker Engine is running
Prefect Server is up and running
PREFECT__LOGGING__EXTRA_LOGGERS
environment variable is set (it is required to get Kedro logs emitted):
export PREFECT__LOGGING__EXTRA_LOGGERS="['kedro']"
How to run your Kedro pipeline using Prefect¶
Convert your Kedro pipeline to Prefect flow¶
To build a Prefect flow for your Kedro pipeline programmatically and register it with the Prefect API, use the following Python script, which should be stored in your project’s root directory:
# <project_root>/register_prefect_flow.py
from pathlib import Path
import click
from prefect import Client, Flow, Task
from prefect.utilities.exceptions import ClientError
from kedro.framework.project import pipelines
from kedro.framework.session import KedroSession
from kedro.framework.startup import bootstrap_project
from kedro.io import DataCatalog, MemoryDataSet
from kedro.pipeline.node import Node
from kedro.runner import run_node
class KedroTask(Task):
"""Kedro node as a Prefect task."""
def __init__(self, node: Node, catalog: DataCatalog) -> None:
self._node = node
self._catalog = catalog
super().__init__(name=node.name, tags=node.tags)
def run(self):
run_node(self._node, self._catalog)
@click.command()
@click.option("-p", "--pipeline", "pipeline_name", default=None)
@click.option("--env", "-e", type=str, default=None)
def build_and_register_flow(pipeline_name, env):
"""Register a Kedro pipeline as a Prefect flow."""
project_path = Path.cwd()
metadata = bootstrap_project(project_path)
session = KedroSession.create(project_path=project_path, env=env)
context = session.load_context()
catalog = context.catalog
pipeline_name = pipeline_name or "__default__"
pipeline = pipelines.get(pipeline_name)
unregistered_ds = pipeline.data_sets() - set(catalog.list())
for ds_name in unregistered_ds:
catalog.add(ds_name, MemoryDataSet())
flow = Flow(metadata.project_name)
tasks = {}
for node, parent_nodes in pipeline.node_dependencies.items():
if node._unique_key not in tasks:
node_task = KedroTask(node, catalog)
tasks[node._unique_key] = node_task
else:
node_task = tasks[node._unique_key]
parent_tasks = []
for parent in parent_nodes:
if parent._unique_key not in tasks:
parent_task = KedroTask(parent, catalog)
tasks[parent._unique_key] = parent_task
else:
parent_task = tasks[parent._unique_key]
parent_tasks.append(parent_task)
flow.set_dependencies(task=node_task, upstream_tasks=parent_tasks)
client = Client()
try:
client.create_project(project_name=metadata.project_name)
except ClientError:
# `metadata.project_name` project already exists
pass
# Register the flow with the server
flow.register(project_name=metadata.project_name)
# Start a local agent that can communicate between the server
# and your flow code
flow.run_agent()
if __name__ == "__main__":
build_and_register_flow()
Note
The script launches a local agent. Remember to stop the agent with Ctrl-C when you complete.
Run Prefect flow¶
Now, having the flow registered, you can use Prefect UI to orchestrate and monitor it.
Navigate to http://localhost:8080/default?flows= to see your registered flow.
Click on the flow to open it and then trigger your flow using the “RUN”/”QUICK RUN” button.