Source code for kedro.framework.cli.pipeline

# Copyright 2021 QuantumBlack Visual Analytics Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, AND
# NONINFRINGEMENT. IN NO EVENT WILL THE LICENSOR OR OTHER CONTRIBUTORS
# BE LIABLE FOR ANY CLAIM, DAMAGES, OR OTHER LIABILITY, WHETHER IN AN
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF, OR IN
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# The QuantumBlack Visual Analytics Limited ("QuantumBlack") name and logo
# (either separately or in combination, "QuantumBlack Trademarks") are
# trademarks of QuantumBlack. The License does not grant you any right or
# license to the QuantumBlack Trademarks. You may not use the QuantumBlack
# Trademarks or any confusingly similar mark as a trademark for your product,
# or use the QuantumBlack Trademarks in any other manner that might cause
# confusion in the marketplace, including but not limited to in advertising,
# on websites, or on software.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-lines

"""A collection of CLI commands for working with Kedro pipelines."""
import json
import re
import shutil
import sys
import tempfile
from importlib import import_module
from pathlib import Path
from textwrap import indent
from typing import Any, List, NamedTuple, Optional, Tuple, Union
from zipfile import ZipFile

import click
import pkg_resources
import yaml
from rope.base.project import Project
from rope.contrib import generate
from rope.refactor.move import MoveModule
from rope.refactor.rename import Rename
from setuptools.dist import Distribution

import kedro
from kedro.framework.cli.utils import (
    KedroCliError,
    _clean_pycache,
    _filter_deprecation_warnings,
    _get_requirements_in,
    call,
    command_with_verbosity,
    env_option,
    python_call,
)
from kedro.framework.project import pipelines, settings
from kedro.framework.startup import ProjectMetadata

_SETUP_PY_TEMPLATE = """# -*- coding: utf-8 -*-
from setuptools import setup, find_packages

setup(
    name="{name}",
    version="{version}",
    description="Modular pipeline `{name}`",
    packages=find_packages(),
    include_package_data=True,
    package_data={package_data},
    install_requires={install_requires},
)
"""

PipelineArtifacts = NamedTuple(
    "PipelineArtifacts",
    [("pipeline_dir", Path), ("pipeline_tests", Path), ("pipeline_conf", Path)],
)


def _assert_pkg_name_ok(pkg_name: str):
    """Check that python package name is in line with PEP8 requirements.

    Args:
        pkg_name: Candidate Python package name.

    Raises:
        KedroCliError: If package name violates the requirements.
    """

    base_message = f"`{pkg_name}` is not a valid Python package name."
    if not re.match(r"^[a-zA-Z_]", pkg_name):
        message = base_message + " It must start with a letter or underscore."
        raise KedroCliError(message)
    if len(pkg_name) < 2:
        message = base_message + " It must be at least 2 characters long."
        raise KedroCliError(message)
    if not re.match(r"^\w+$", pkg_name[1:]):
        message = (
            base_message + " It must contain only letters, digits, and/or underscores."
        )
        raise KedroCliError(message)


def _check_pipeline_name(ctx, param, value):  # pylint: disable=unused-argument
    if value:
        _assert_pkg_name_ok(value)
    return value


# pylint: disable=missing-function-docstring
@click.group(name="Kedro")
def pipeline_cli():  # pragma: no cover
    pass


@pipeline_cli.group()
def pipeline():
    """Commands for working with pipelines."""


@command_with_verbosity(pipeline, "create")
@click.argument("name", nargs=1, callback=_check_pipeline_name)
@click.option(
    "--skip-config",
    is_flag=True,
    help="Skip creation of config files for the new pipeline(s).",
)
@env_option(help="Environment to create pipeline configuration in. Defaults to `base`.")
@click.pass_obj  # this will pass the metadata as first argument
def create_pipeline(
    metadata: ProjectMetadata, name, skip_config, env, **kwargs
):  # pylint: disable=unused-argument
    """Create a new modular pipeline by providing a name."""
    package_dir = metadata.source_dir / metadata.package_name
    conf_root = settings.CONF_ROOT
    project_conf_path = metadata.project_path / conf_root

    env = env or "base"
    if not skip_config and not (project_conf_path / env).exists():
        raise KedroCliError(
            f"Unable to locate environment `{env}`. "
            f"Make sure it exists in the project configuration."
        )

    result_path = _create_pipeline(name, package_dir / "pipelines")
    _copy_pipeline_tests(name, result_path, package_dir)
    _copy_pipeline_configs(result_path, project_conf_path, skip_config, env=env)
    click.secho(f"\nPipeline `{name}` was successfully created.\n", fg="green")

    click.secho(
        f"To be able to run the pipeline `{name}`, you will need to add it "
        f"to `register_pipelines()` in `{package_dir / 'pipeline_registry.py'}`.",
        fg="yellow",
    )


@command_with_verbosity(pipeline, "delete")
@click.argument("name", nargs=1, callback=_check_pipeline_name)
@env_option(
    help="Environment to delete pipeline configuration from. Defaults to `base`."
)
@click.option(
    "-y", "--yes", is_flag=True, help="Confirm deletion of pipeline non-interactively."
)
@click.pass_obj  # this will pass the metadata as first argument
def delete_pipeline(
    metadata: ProjectMetadata, name, env, yes, **kwargs
):  # pylint: disable=unused-argument
    """Delete a modular pipeline by providing a name."""
    package_dir = metadata.source_dir / metadata.package_name
    conf_root = settings.CONF_ROOT
    project_conf_path = metadata.project_path / conf_root

    env = env or "base"
    if not (project_conf_path / env).exists():
        raise KedroCliError(
            f"Unable to locate environment `{env}`. "
            f"Make sure it exists in the project configuration."
        )

    pipeline_artifacts = _get_pipeline_artifacts(metadata, pipeline_name=name, env=env)

    files_to_delete = [
        pipeline_artifacts.pipeline_conf / confdir / f"{name}.yml"
        for confdir in ("parameters", "catalog")
        if (pipeline_artifacts.pipeline_conf / confdir / f"{name}.yml").is_file()
    ]
    dirs_to_delete = [
        path
        for path in (pipeline_artifacts.pipeline_dir, pipeline_artifacts.pipeline_tests)
        if path.is_dir()
    ]

    if not files_to_delete and not dirs_to_delete:
        raise KedroCliError(f"Pipeline `{name}` not found.")

    if not yes:
        _echo_deletion_warning(
            "The following paths will be removed:",
            directories=dirs_to_delete,
            files=files_to_delete,
        )
        click.echo()
        yes = click.confirm(f"Are you sure you want to delete pipeline `{name}`?")
        click.echo()

    if not yes:
        raise KedroCliError("Deletion aborted!")

    _delete_artifacts(*files_to_delete, *dirs_to_delete)
    click.secho(f"\nPipeline `{name}` was successfully deleted.", fg="green")
    click.secho(
        f"\nIf you added the pipeline `{name}` to `register_pipelines()` in "
        f"`{package_dir / 'pipeline_registry.py'}`, you will need to remove it.",
        fg="yellow",
    )


@pipeline.command("list")
def list_pipelines():
    """List all pipelines defined in your pipeline_registry.py file. (DEPRECATED)"""
    deprecation_message = (
        "DeprecationWarning: Command `kedro pipeline list` is deprecated. "
        "Please use `kedro registry list` instead."
    )
    click.secho(deprecation_message, fg="red")

    click.echo(yaml.dump(sorted(pipelines)))


@command_with_verbosity(pipeline, "describe")
@click.argument("name", nargs=1, default="__default__")
@click.pass_obj
def describe_pipeline(
    metadata: ProjectMetadata, name, **kwargs
):  # pylint: disable=unused-argument, protected-access
    """Describe a pipeline by providing a pipeline name.
    Defaults to the __default__ pipeline. (DEPRECATED)
    """
    deprecation_message = (
        "DeprecationWarning: Command `kedro pipeline describe` is deprecated. "
        "Please use `kedro registry describe` instead."
    )
    click.secho(deprecation_message, fg="red")

    pipeline_obj = pipelines.get(name)
    if not pipeline_obj:
        all_pipeline_names = pipelines.keys()
        existing_pipelines = ", ".join(sorted(all_pipeline_names))
        raise KedroCliError(
            f"`{name}` pipeline not found. Existing pipelines: [{existing_pipelines}]"
        )

    nodes = []
    for node in pipeline_obj.nodes:
        namespace = f"{node.namespace}." if node.namespace else ""
        nodes.append(f"{namespace}{node._name or node._func_name} ({node._func_name})")
    result = {"Nodes": nodes}

    click.echo(yaml.dump(result))


@command_with_verbosity(pipeline, "pull")
@click.argument("package_path", nargs=1, required=False)
@click.option(
    "--all",
    "-a",
    "all_flag",
    is_flag=True,
    help="Pull and unpack all pipelines in the `pyproject.toml` package manifest section.",
)
@env_option(
    help="Environment to install the pipeline configuration to. Defaults to `base`."
)
@click.option(
    "--alias",
    type=str,
    default="",
    callback=_check_pipeline_name,
    help="Alternative name to unpackage under.",
)
@click.option(
    "--fs-args",
    type=click.Path(
        exists=True, file_okay=True, dir_okay=False, readable=True, resolve_path=True
    ),
    default=None,
    help="Location of a configuration file for the fsspec filesystem used to pull the package.",
)
@click.pass_obj  # this will pass the metadata as first argument
def pull_package(  # pylint:disable=unused-argument, too-many-arguments
    metadata: ProjectMetadata, package_path, env, alias, fs_args, all_flag, **kwargs
) -> None:
    """Pull and unpack a modular pipeline in your project."""
    if not package_path and not all_flag:
        click.secho(
            "Please specify a package path or add '--all' to pull all pipelines in the "
            "`pyproject.toml` package manifest section."
        )
        sys.exit(1)

    if all_flag:
        _pull_packages_from_manifest(metadata)
        return

    _pull_package(package_path, metadata, env=env, alias=alias, fs_args=fs_args)
    as_alias = f" as `{alias}`" if alias else ""
    message = f"Pipeline {package_path} pulled and unpacked{as_alias}!"
    click.secho(message, fg="green")


def _pull_package(
    package_path: str,
    metadata: ProjectMetadata,
    env: str = None,
    alias: str = None,
    fs_args: str = None,
):
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir_path = Path(temp_dir).resolve()

        _unpack_wheel(package_path, temp_dir_path, fs_args)

        dist_info_file = list(temp_dir_path.glob("*.dist-info"))
        if len(dist_info_file) != 1:
            raise KedroCliError(
                f"More than 1 or no dist-info files found from {package_path}. "
                f"There has to be exactly one dist-info directory."
            )
        # Extract package name, based on the naming convention for wheel files
        # https://www.python.org/dev/peps/pep-0427/#file-name-convention
        package_name = dist_info_file[0].stem.split("-")[0]
        package_metadata = dist_info_file[0] / "METADATA"

        _clean_pycache(temp_dir_path)
        _install_files(metadata, package_name, temp_dir_path, env, alias)

        req_pattern = r"Requires-Dist: (.*?)\n"
        package_reqs = re.findall(req_pattern, package_metadata.read_text())
        if package_reqs:
            requirements_in = _get_requirements_in(
                metadata.source_dir, create_empty=True
            )
            _append_package_reqs(requirements_in, package_reqs, package_name)


def _pull_packages_from_manifest(metadata: ProjectMetadata) -> None:
    # pylint: disable=import-outside-toplevel
    import anyconfig  # for performance reasons

    config_dict = anyconfig.load(metadata.config_file)
    config_dict = config_dict["tool"]["kedro"]
    build_specs = config_dict.get("pipeline", {}).get("pull")

    if not build_specs:
        click.secho(
            "Nothing to pull. Please update the `pyproject.toml` package manifest section.",
            fg="yellow",
        )
        return

    for package_path, specs in build_specs.items():
        if "alias" in specs:
            _assert_pkg_name_ok(specs["alias"])
        _pull_package(package_path, metadata, **specs)
        click.secho(f"Pulled and unpacked `{package_path}`!")

    click.secho("Pipelines pulled and unpacked!", fg="green")


def _package_pipelines_from_manifest(metadata: ProjectMetadata) -> None:
    # pylint: disable=import-outside-toplevel
    import anyconfig  # for performance reasons

    config_dict = anyconfig.load(metadata.config_file)
    config_dict = config_dict["tool"]["kedro"]
    build_specs = config_dict.get("pipeline", {}).get("package")

    if not build_specs:
        click.secho(
            "Nothing to package. Please update the `pyproject.toml` package manifest section.",
            fg="yellow",
        )
        return

    for pipeline_name, specs in build_specs.items():
        if "alias" in specs:
            _assert_pkg_name_ok(specs["alias"])
        _package_pipeline(pipeline_name, metadata, **specs)
        click.secho(f"Packaged `{pipeline_name}` pipeline!")

    click.secho("Pipelines packaged!", fg="green")


@pipeline.command("package")
@env_option(
    help="Environment where the pipeline configuration lives. Defaults to `base`."
)
@click.option(
    "--alias",
    type=str,
    default="",
    callback=_check_pipeline_name,
    help="Alternative name to package under.",
)
@click.option(
    "-d",
    "--destination",
    type=click.Path(resolve_path=True, file_okay=False),
    help="Location where to create the wheel file. Defaults to `src/dist`.",
)
@click.option(
    "-v",
    "--version",
    type=str,
    help="Version to package under. "
    "Defaults to pipeline package version or, "
    "if that is not defined, the project package version.",
)
@click.option(
    "--all",
    "-a",
    "all_flag",
    is_flag=True,
    help="Package all pipelines in the `pyproject.toml` package manifest section.",
)
@click.argument("name", nargs=1, required=False)
@click.pass_obj  # this will pass the metadata as first argument
def package_pipeline(
    metadata: ProjectMetadata, name, env, alias, destination, version, all_flag
):  # pylint: disable=too-many-arguments
    """Package up a modular pipeline as a Python .whl."""
    if not name and not all_flag:
        click.secho(
            "Please specify a pipeline name or add '--all' to package all pipelines in "
            "the `pyproject.toml` package manifest section."
        )
        sys.exit(1)

    if all_flag:
        _package_pipelines_from_manifest(metadata)
        return

    result_path = _package_pipeline(
        name, metadata, alias=alias, destination=destination, env=env, version=version
    )

    as_alias = f" as `{alias}`" if alias else ""
    message = f"Pipeline `{name}` packaged{as_alias}! Location: {result_path}"
    click.secho(message, fg="green")


def _echo_deletion_warning(message: str, **paths: List[Path]):
    paths = {key: values for key, values in paths.items() if values}

    if paths:
        click.secho(message, bold=True)

    for key, values in paths.items():
        click.echo(f"\n{key.capitalize()}:")
        paths_str = "\n".join(str(value) for value in values)
        click.echo(indent(paths_str, " " * 2))


def _get_fsspec_filesystem(location: str, fs_args: Optional[str]):
    # pylint: disable=import-outside-toplevel
    import anyconfig
    import fsspec

    from kedro.io.core import get_protocol_and_path

    protocol, _ = get_protocol_and_path(location)
    fs_args_config = anyconfig.load(fs_args) if fs_args else {}

    try:
        return fsspec.filesystem(protocol, **fs_args_config)
    except Exception as exc:  # pylint: disable=broad-except
        # Specified protocol is not supported by `fsspec`
        # or requires extra dependencies
        click.secho(str(exc), fg="red")
        click.secho("Trying to use 'pip download'...", fg="red")
        return None


def _unpack_wheel(location: str, destination: Path, fs_args: Optional[str]) -> None:
    filesystem = _get_fsspec_filesystem(location, fs_args)

    if location.endswith(".whl") and filesystem and filesystem.exists(location):
        with filesystem.open(location) as fs_file:
            # pylint: disable=consider-using-with
            ZipFile(fs_file).extractall(destination)
    else:
        python_call(
            "pip", ["download", "--no-deps", "--dest", str(destination), location]
        )
        wheel_file = list(destination.glob("*.whl"))
        # `--no-deps` should fetch only one wheel file, and CLI should fail if that's
        # not the case.
        if len(wheel_file) != 1:
            file_names = [wf.name for wf in wheel_file]
            raise KedroCliError(
                f"More than 1 or no wheel files found: {file_names}. "
                f"There has to be exactly one distribution file."
            )
        # pylint: disable=consider-using-with
        ZipFile(wheel_file[0]).extractall(destination)


def _rename_files(conf_source: Path, old_name: str, new_name: str):
    config_files_to_rename = (
        each
        for each in conf_source.rglob("*")
        if each.is_file() and old_name in each.name
    )
    for config_file in config_files_to_rename:
        new_config_name = config_file.name.replace(old_name, new_name)
        config_file.rename(config_file.parent / new_config_name)


def _refactor_code_for_unpacking(
    project: Project,
    package_path: Path,
    tests_path: Path,
    alias: Optional[str],
    project_metadata: ProjectMetadata,
) -> Tuple[Path, Path]:
    """This is the reverse operation of `_refactor_code_for_package`, i.e
    we go from:
    <temp_dir>  # also the root of the Rope project
    |__ <pipeline_name>  # or <alias>
        |__ __init__.py
    |__ tests  # only tests for <pipeline_name>
        |__ __init__.py
        |__ test_pipeline.py

    to:
    <temp_dir>  # also the root of the Rope project
    |__ <package>
        |__ __init__.py
        |__ pipelines
            |__ __init__.py
            |__ <pipeline_name>
                |__ __init__.py
    |__ tests
        |__ __init__.py
        |__ pipelines
            |__ __init__.py
            |__ <pipeline_name>
                |__ __init__.py
    """
    pipeline_name = package_path.stem
    if alias:
        _rename_package(project, pipeline_name, alias)
        pipeline_name = alias

    package_target = Path(project_metadata.package_name) / "pipelines"
    full_path = _create_nested_package(project, package_target)
    _move_package(project, pipeline_name, package_target.as_posix())
    refactored_package_path = full_path / pipeline_name

    if not tests_path.exists():
        return refactored_package_path, tests_path

    # we can't rename the tests package to <pipeline_name>
    # because it will conflict with existing top-level package;
    # hence we give it a temp name, create the expected
    # nested folder structure, move the contents there,
    # then rename the temp name to <pipeline_name>.
    _rename_package(project, "tests", "tmp_name")
    tests_target = Path("tests") / "pipelines"
    full_path = _create_nested_package(project, tests_target)
    _move_package(project, "tmp_name", tests_target.as_posix())
    _rename_package(project, (tests_target / "tmp_name").as_posix(), pipeline_name)
    refactored_tests_path = full_path / pipeline_name

    return refactored_package_path, refactored_tests_path


def _install_files(
    project_metadata: ProjectMetadata,
    package_name: str,
    source_path: Path,
    env: str = None,
    alias: str = None,
):
    env = env or "base"

    package_source, test_source, conf_source = _get_package_artifacts(
        source_path, package_name
    )

    if conf_source.is_dir() and alias:
        _rename_files(conf_source, package_name, alias)

    pipeline_name = alias or package_name
    package_dest, test_dest, conf_dest = _get_pipeline_artifacts(
        project_metadata, pipeline_name=pipeline_name, env=env
    )

    if conf_source.is_dir():
        _sync_dirs(conf_source, conf_dest)
        # `config` dir was packaged under `package_name` directory with
        # `kedro pipeline package`. Since `config` was already synced,
        # we don't want to copy it again when syncing the package, so we remove it.
        shutil.rmtree(str(conf_source))

    project = Project(source_path)
    refactored_package_source, refactored_test_source = _refactor_code_for_unpacking(
        project, package_source, test_source, alias, project_metadata
    )
    project.close()

    if refactored_test_source.is_dir():
        _sync_dirs(refactored_test_source, test_dest)

    # Sync everything under package directory, except `config`
    # since it has already been copied.
    if refactored_package_source.is_dir():
        _sync_dirs(refactored_package_source, package_dest)


def _find_config_files(
    source_config_dir: Path, glob_patterns: List[str]
) -> List[Tuple[Path, str]]:
    config_files = []  # type: List[Tuple[Path, str]]

    if source_config_dir.is_dir():
        config_files = [
            (path, path.parent.relative_to(source_config_dir).as_posix())
            for glob_pattern in glob_patterns
            for path in source_config_dir.glob(glob_pattern)
            if path.is_file()
        ]

    return config_files


def _get_default_version(metadata: ProjectMetadata, pipeline_name: str) -> str:
    # default to pipeline package version
    try:
        pipeline_module = import_module(
            f"{metadata.package_name}.pipelines.{pipeline_name}"
        )
        return pipeline_module.__version__  # type: ignore
    except (AttributeError, ModuleNotFoundError):
        # if pipeline version doesn't exist, take the project one
        project_module = import_module(f"{metadata.package_name}")
        return project_module.__version__  # type: ignore


def _package_pipeline(  # pylint: disable=too-many-arguments
    pipeline_name: str,
    metadata: ProjectMetadata,
    alias: str = None,
    destination: str = None,
    env: str = None,
    version: str = None,
) -> Path:
    package_dir = metadata.source_dir / metadata.package_name
    env = env or "base"

    artifacts_to_package = _get_pipeline_artifacts(
        metadata, pipeline_name=pipeline_name, env=env
    )
    # as the wheel file will only contain parameters, we aren't listing other
    # config files not to confuse users and avoid useless file copies
    configs_to_package = _find_config_files(
        artifacts_to_package.pipeline_conf,
        [f"parameters*/**/{pipeline_name}.yml", f"parameters*/**/{pipeline_name}/*"],
    )

    source_paths = (
        artifacts_to_package.pipeline_dir,
        artifacts_to_package.pipeline_tests,
        configs_to_package,
    )

    # Check that pipeline directory exists and not empty
    _validate_dir(artifacts_to_package.pipeline_dir)

    destination = Path(destination) if destination else package_dir.parent / "dist"
    version = version or _get_default_version(metadata, pipeline_name)

    _generate_wheel_file(
        pipeline_name=pipeline_name,
        destination=destination.resolve(),
        source_paths=source_paths,
        version=version,
        metadata=metadata,
        alias=alias,
    )

    _clean_pycache(package_dir)
    _clean_pycache(metadata.project_path)

    return destination


def _validate_dir(path: Path) -> None:
    if not path.is_dir():
        raise KedroCliError(f"Directory '{path}' doesn't exist.")
    if not list(path.iterdir()):
        raise KedroCliError(f"'{path}' is an empty directory.")


def _get_wheel_name(**kwargs: Any) -> str:
    # https://stackoverflow.com/q/51939257/3364156
    dist = Distribution(attrs=kwargs)
    bdist_wheel_cmd = dist.get_command_obj("bdist_wheel")
    bdist_wheel_cmd.ensure_finalized()

    distname = bdist_wheel_cmd.wheel_dist_name
    tag = "-".join(bdist_wheel_cmd.get_tag())
    return f"{distname}-{tag}.whl"


def _sync_path_list(source: List[Tuple[Path, str]], target: Path) -> None:
    for source_path, suffix in source:
        target_with_suffix = (target / suffix).resolve()
        _sync_dirs(source_path, target_with_suffix)


def _make_install_requires(requirements_txt: Path) -> List[str]:
    """Parses each line of requirements.txt into a version specifier valid to put in
    install_requires."""
    if not requirements_txt.exists():
        return []
    requirements = pkg_resources.parse_requirements(requirements_txt.read_text())
    return [str(requirement) for requirement in requirements]


def _create_nested_package(project: Project, package_path: Path) -> Path:
    # fails if parts of the path exists already
    packages = package_path.parts
    parent = generate.create_package(project, packages[0])
    nested_path = Path(project.address) / packages[0]
    for package in packages[1:]:
        parent = generate.create_package(project, package, sourcefolder=parent)
        nested_path = nested_path / package
    return nested_path


def _move_package(project: Project, source: str, target: str) -> None:
    """
    Move a Python package, refactoring relevant imports along the way.
    A target of empty string means moving to the root of the `project`.

    Args:
        project: rope.base.Project holding the scope of the refactoring.
        source: Name of the Python package to be moved. Can be a fully
            qualified module path relative to the `project` root, e.g.
            "package.pipelines.pipeline" or "package/pipelines/pipeline".
        target: Destination of the Python package to be moved. Can be a fully
            qualified module path relative to the `project` root, e.g.
            "package.pipelines.pipeline" or "package/pipelines/pipeline".
    """
    src_folder = project.get_module(source).get_resource()
    target_folder = project.get_module(target).get_resource()
    change = MoveModule(project, src_folder).get_changes(dest=target_folder)
    project.do(change)


def _rename_package(project: Project, old_name: str, new_name: str) -> None:
    """
    Rename a Python package, refactoring relevant imports along the way,
    as well as references in comments.

    Args:
        project: rope.base.Project holding the scope of the refactoring.
        old_name: Old module name. Can be a fully qualified module path,
            e.g. "package.pipelines.pipeline" or "package/pipelines/pipeline",
            relative to the `project` root.
        new_name: New module name. Can't be a fully qualified module path.
    """
    folder = project.get_folder(old_name)
    change = Rename(project, folder).get_changes(new_name, docs=True)
    project.do(change)


def _refactor_code_for_package(
    project: Project,
    package_path: Path,
    tests_path: Path,
    alias: Optional[str],
    project_metadata: ProjectMetadata,
) -> None:
    """In order to refactor the imports properly, we need to recreate
    the same nested structure as in the project. Therefore, we create:
    <temp_dir>  # also the root of the Rope project
    |__ <package>
        |__ __init__.py
        |__ pipelines
            |__ __init__.py
            |__ <pipeline_name>
                |__ __init__.py
    |__ tests
        |__ __init__.py
        |__ pipelines
            |__ __init__.py
            |__ <pipeline_name>
                |__ __init__.py
    We then move <pipeline_name> outside of package src to top level ("")
    in temp_dir, and rename folder & imports if alias provided.

    For tests, we need to extract all the contents of <pipeline_name>
    at into top-level `tests` folder. This is not possible in one go with
    the Rope API, so we have to do it in a bit of a hacky way.
    We rename <pipeline_name> to a `tmp_name` and move it at top-level ("")
    in temp_dir. We remove the old `tests` folder and rename `tmp_name` to `tests`.

    The final structure should be:
    <temp_dir>  # also the root of the Rope project
    |__ <pipeline_name>  # or <alias>
        |__ __init__.py
    |__ tests  # only tests for <pipeline_name>
        |__ __init__.py
        |__ test_pipeline.py
    """
    # Copy source in appropriate folder structure
    package_target = package_path.relative_to(project_metadata.source_dir)
    full_path = _create_nested_package(project, package_target)
    # overwrite=True to update the __init__.py files generated by create_package
    _sync_dirs(package_path, full_path, overwrite=True)

    # Copy tests in appropriate folder structure
    if tests_path.exists():
        tests_target = tests_path.relative_to(project_metadata.source_dir)
        full_path = _create_nested_package(project, tests_target)
        # overwrite=True to update the __init__.py files generated by create_package
        _sync_dirs(tests_path, full_path, overwrite=True)

    # Refactor imports in src/package_name/pipelines/pipeline_name
    # and imports of `pipeline_name` in tests
    _move_package(project, package_target.as_posix(), "")
    if alias:
        pipeline_name = package_target.stem
        _rename_package(project, pipeline_name, alias)

    if tests_path.exists():
        # we can't move the relevant tests folder as is because
        # it will conflict with the top-level package <pipeline_name>;
        # we can't rename it "tests" and move it, because it will conflict
        # with the existing "tests" folder at top level;
        # hence we give it a temp name, move it, delete tests/ and
        # rename the temp name to tests.
        tmp_name = "extracted_tests"
        tmp_module = tests_target.parent / tmp_name
        _rename_package(project, tests_target.as_posix(), tmp_name)
        _move_package(project, tmp_module.as_posix(), "")
        shutil.rmtree(Path(project.address) / "tests")
        _rename_package(project, tmp_name, "tests")

    shutil.rmtree(Path(project.address) / project_metadata.package_name)


_SourcePathType = Union[Path, List[Tuple[Path, str]]]


# pylint: disable=too-many-arguments,too-many-locals
def _generate_wheel_file(
    pipeline_name: str,
    destination: Path,
    source_paths: Tuple[_SourcePathType, ...],
    version: str,
    metadata: ProjectMetadata,
    alias: str = None,
) -> None:
    package_name = alias or pipeline_name
    package_source, tests_source, conf_source = source_paths

    with tempfile.TemporaryDirectory() as temp_dir:
        temp_dir_path = Path(temp_dir).resolve()

        project = Project(temp_dir_path)  # project where to do refactoring
        _refactor_code_for_package(  # type: ignore
            project, package_source, tests_source, alias, metadata
        )
        project.close()

        # Copy & "refactor" config
        _, _, conf_target = _get_package_artifacts(temp_dir_path, package_name)
        _sync_path_list(conf_source, conf_target)  # type: ignore
        if conf_target.is_dir() and alias:
            _rename_files(conf_target, pipeline_name, alias)

        # Build a setup.py on the fly
        try:
            install_requires = _make_install_requires(
                package_source / "requirements.txt"  # type: ignore
            )
        except Exception as exc:
            click.secho("FAILED", fg="red")
            cls = exc.__class__
            raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc

        setup_file = _generate_setup_file(
            package_name, version, install_requires, temp_dir_path
        )

        package_file = destination / _get_wheel_name(name=package_name, version=version)
        if package_file.is_file():
            click.secho(
                f"Package file {package_file} will be overwritten!", fg="yellow"
            )

        # python setup.py bdist_wheel --dist-dir <destination>
        call(
            [
                sys.executable,
                str(setup_file.resolve()),
                "bdist_wheel",
                "--dist-dir",
                str(destination),
            ],
            cwd=temp_dir,
        )


def _generate_setup_file(
    package_name: str, version: str, install_requires: List[str], output_dir: Path
) -> Path:
    setup_file = output_dir / "setup.py"
    package_data = {
        package_name: [
            "README.md",
            "config/parameters*",
            "config/**/parameters*",
            "config/parameters*/**",
            "config/parameters*/**/*",
        ]
    }
    setup_file_context = dict(
        name=package_name,
        version=version,
        package_data=json.dumps(package_data),
        install_requires=install_requires,
    )

    setup_file.write_text(_SETUP_PY_TEMPLATE.format(**setup_file_context))
    return setup_file


def _create_pipeline(name: str, output_dir: Path) -> Path:
    with _filter_deprecation_warnings():
        # pylint: disable=import-outside-toplevel
        from cookiecutter.main import cookiecutter

    template_path = Path(kedro.__file__).parent / "templates" / "pipeline"
    cookie_context = {"pipeline_name": name, "kedro_version": kedro.__version__}

    click.echo(f"Creating the pipeline `{name}`: ", nl=False)

    try:
        result_path = cookiecutter(
            str(template_path),
            output_dir=str(output_dir),
            no_input=True,
            extra_context=cookie_context,
        )
    except Exception as exc:
        click.secho("FAILED", fg="red")
        cls = exc.__class__
        raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc

    click.secho("OK", fg="green")
    result_path = Path(result_path)
    message = indent(f"Location: `{result_path.resolve()}`", " " * 2)
    click.secho(message, bold=True)

    _clean_pycache(result_path)

    return result_path


# pylint: disable=missing-raises-doc
def _sync_dirs(source: Path, target: Path, prefix: str = "", overwrite: bool = False):
    """Recursively copies `source` directory (or file) into `target` directory without
    overwriting any existing files/directories in the target using the following
    rules:
        1) Skip any files/directories which names match with files in target,
        unless overwrite=True.
        2) Copy all files from source to target.
        3) Recursively copy all directories from source to target.

    Args:
        source: A local directory to copy from, must exist.
        target: A local directory to copy to, will be created if doesn't exist yet.
        prefix: Prefix for CLI message indentation.
    """

    existing = list(target.iterdir()) if target.is_dir() else []
    existing_files = {f.name for f in existing if f.is_file()}
    existing_folders = {f.name for f in existing if f.is_dir()}

    if source.is_dir():
        content = list(source.iterdir())
    elif source.is_file():
        content = [source]
    else:
        # nothing to copy
        content = []  # pragma: no cover

    for source_path in content:
        source_name = source_path.name
        target_path = target / source_name
        click.echo(indent(f"Creating `{target_path}`: ", prefix), nl=False)

        if (  # rule #1
            not overwrite
            and source_name in existing_files
            or source_path.is_file()
            and source_name in existing_folders
        ):
            click.secho("SKIPPED (already exists)", fg="yellow")
        elif source_path.is_file():  # rule #2
            try:
                target.mkdir(exist_ok=True, parents=True)
                shutil.copyfile(str(source_path), str(target_path))
            except Exception:
                click.secho("FAILED", fg="red")
                raise
            click.secho("OK", fg="green")
        else:  # source_path is a directory, rule #3
            click.echo()
            new_prefix = (prefix or "") + " " * 2
            _sync_dirs(source_path, target_path, prefix=new_prefix)


def _get_pipeline_artifacts(
    project_metadata: ProjectMetadata, pipeline_name: str, env: str
) -> PipelineArtifacts:
    """From existing project, returns in order: source_path, tests_path, config_paths"""
    package_dir = project_metadata.source_dir / project_metadata.package_name
    conf_root = settings.CONF_ROOT
    project_conf_path = project_metadata.project_path / conf_root
    artifacts = PipelineArtifacts(
        package_dir / "pipelines" / pipeline_name,
        package_dir.parent / "tests" / "pipelines" / pipeline_name,
        project_conf_path / env,
    )
    return artifacts


def _get_package_artifacts(
    source_path: Path, package_name: str
) -> Tuple[Path, Path, Path]:
    """From existing unpacked wheel, returns in order:
    source_path, tests_path, config_path
    """
    artifacts = (
        source_path / package_name,
        source_path / "tests",
        # package_data (non-python files) needs to live inside one of the packages
        source_path / package_name / "config",
    )
    return artifacts


def _copy_pipeline_tests(pipeline_name: str, result_path: Path, package_dir: Path):
    tests_source = result_path / "tests"
    tests_target = package_dir.parent / "tests" / "pipelines" / pipeline_name
    try:
        _sync_dirs(tests_source, tests_target)
    finally:
        shutil.rmtree(tests_source)


def _copy_pipeline_configs(
    result_path: Path, conf_path: Path, skip_config: bool, env: str
):
    config_source = result_path / "config"
    try:
        if not skip_config:
            config_target = conf_path / env
            _sync_dirs(config_source, config_target)
    finally:
        shutil.rmtree(config_source)


def _delete_artifacts(*artifacts: Path):
    for artifact in artifacts:
        click.echo(f"Deleting `{artifact}`: ", nl=False)
        try:
            if artifact.is_dir():
                shutil.rmtree(artifact)
            else:
                artifact.unlink()
        except Exception as exc:
            click.secho("FAILED", fg="red")
            cls = exc.__class__
            raise KedroCliError(f"{cls.__module__}.{cls.__qualname__}: {exc}") from exc
        else:
            click.secho("OK", fg="green")


def _append_package_reqs(
    requirements_in: Path, package_reqs: List[str], pipeline_name: str
) -> None:
    """Appends modular pipeline requirements to project level requirements.in"""
    existing_reqs = pkg_resources.parse_requirements(requirements_in.read_text())
    new_reqs = pkg_resources.parse_requirements(package_reqs)
    reqs_to_add = set(new_reqs) - set(existing_reqs)
    if not reqs_to_add:
        return

    sorted_reqs = sorted(str(req) for req in reqs_to_add)
    with open(requirements_in, "a", encoding="utf-8") as file:
        file.write(
            f"\n\n# Additional requirements from modular pipeline `{pipeline_name}`:\n"
        )
        file.write("\n".join(sorted_reqs))
    click.secho(
        "Added the following requirements from modular pipeline `{}` to "
        "requirements.in:\n{}".format(pipeline_name, "\n".join(sorted_reqs))
    )
    click.secho(
        "Use `kedro install --build-reqs` to compile and install the updated list of "
        "requirements."
    )