"""This module provides context for Kedro project."""
from __future__ import annotations
import logging
from copy import deepcopy
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import Any
from urllib.parse import urlparse
from warnings import warn
from attrs import define, field
from omegaconf import OmegaConf
from pluggy import PluginManager
from kedro.config import AbstractConfigLoader, MissingConfigException
from kedro.framework.project import settings
from kedro.io import DataCatalog
from kedro.pipeline._transcoding import _transcode_split
def _is_relative_path(path_string: str) -> bool:
"""Checks whether a path string is a relative path.
Example:
::
>>> _is_relative_path("data/01_raw") == True
>>> _is_relative_path("info.log") == True
>>> _is_relative_path("/tmp/data/01_raw") == False
>>> _is_relative_path(r"C:\\info.log") == False
>>> _is_relative_path(r"\\'info.log") == False
>>> _is_relative_path("c:/info.log") == False
>>> _is_relative_path("s3://info.log") == False
Args:
path_string: The path string to check.
Returns:
Whether the string is a relative path.
"""
# os.path.splitdrive does not reliably work on non-Windows systems
# breaking the coverage, using PureWindowsPath instead
is_full_windows_path_with_drive = bool(PureWindowsPath(path_string).drive)
if is_full_windows_path_with_drive:
return False
is_remote_path = bool(urlparse(path_string).scheme)
if is_remote_path:
return False
is_absolute_path = PurePosixPath(path_string).is_absolute()
if is_absolute_path:
return False
return True
def _convert_paths_to_absolute_posix(
project_path: Path, conf_dictionary: dict[str, Any]
) -> dict[str, Any]:
"""Turn all relative paths inside ``conf_dictionary`` into absolute paths by appending them
to ``project_path`` and convert absolute Windows paths to POSIX format. This is a hack to
make sure that we don't have to change user's working directory for logging and datasets to
work. It is important for non-standard workflows such as IPython notebook where users don't go
through `kedro run` or `__main__.py` entrypoints.
Example:
::
>>> conf = _convert_paths_to_absolute_posix(
>>> project_path=Path("/path/to/my/project"),
>>> conf_dictionary={
>>> "handlers": {
>>> "info_file_handler": {
>>> "filename": "info.log"
>>> }
>>> }
>>> }
>>> )
>>> print(conf['handlers']['info_file_handler']['filename'])
"/path/to/my/project/info.log"
Args:
project_path: The root directory to prepend to relative path to make absolute path.
conf_dictionary: The configuration containing paths to expand.
Returns:
A dictionary containing only absolute paths.
Raises:
ValueError: If the provided ``project_path`` is not an absolute path.
"""
if not project_path.is_absolute():
raise ValueError(
f"project_path must be an absolute path. Received: {project_path}"
)
# only check a few conf keys that are known to specify a path string as value
conf_keys_with_filepath = ("filename", "filepath", "path")
for conf_key, conf_value in conf_dictionary.items():
# if the conf_value is another dictionary, absolutify its paths first.
if isinstance(conf_value, dict):
conf_dictionary[conf_key] = _convert_paths_to_absolute_posix(
project_path, conf_value
)
continue
# if the conf_value is not a dictionary nor a string, skip
if not isinstance(conf_value, str):
continue
# if the conf_value is a string but the conf_key isn't one associated with filepath, skip
if conf_key not in conf_keys_with_filepath:
continue
if _is_relative_path(conf_value):
# Absolute local path should be in POSIX format
conf_value_absolute_path = (project_path / conf_value).as_posix()
conf_dictionary[conf_key] = conf_value_absolute_path
elif PureWindowsPath(conf_value).drive:
# Convert absolute Windows path to POSIX format
conf_dictionary[conf_key] = PureWindowsPath(conf_value).as_posix()
return conf_dictionary
def _validate_transcoded_datasets(catalog: DataCatalog) -> None:
"""Validates transcoded datasets are correctly named
Args:
catalog (DataCatalog): The catalog object containing the
datasets to be validated.
Raises:
ValueError: If a dataset name does not conform to the expected
transcoding naming conventions,a ValueError is raised by the
`_transcode_split` function.
"""
for dataset_name in catalog._datasets.keys():
_transcode_split(dataset_name)
def _expand_full_path(project_path: str | Path) -> Path:
return Path(project_path).expanduser().resolve()
[docs]
@define(slots=False) # Enable setting new attributes to `KedroContext`
class KedroContext:
"""``KedroContext`` is the base class which holds the configuration and
Kedro's main functionality.
Create a context object by providing the root of a Kedro project and
the environment configuration subfolders (see ``kedro.config.OmegaConfigLoader``)
Raises:
KedroContextError: If there is a mismatch
between Kedro project version and package version.
Args:
project_path: Project path to define the context for.
config_loader: Kedro's ``OmegaConfigLoader`` for loading the configuration files.
env: Optional argument for configuration default environment to be used
for running the pipeline. If not specified, it defaults to "local".
package_name: Package name for the Kedro project the context is
created for.
hook_manager: The ``PluginManager`` to activate hooks, supplied by the session.
extra_params: Optional dictionary containing extra project parameters.
If specified, will update (and therefore take precedence over)
the parameters retrieved from the project configuration.
"""
project_path: Path = field(init=True, converter=_expand_full_path)
config_loader: AbstractConfigLoader = field(init=True)
env: str | None = field(init=True)
_package_name: str = field(init=True)
_hook_manager: PluginManager = field(init=True)
_extra_params: dict[str, Any] | None = field(
init=True, default=None, converter=deepcopy
)
@property
def catalog(self) -> DataCatalog:
"""Read-only property referring to Kedro's ``DataCatalog`` for this context.
Returns:
DataCatalog defined in `catalog.yml`.
Raises:
KedroContextError: Incorrect ``DataCatalog`` registered for the project.
"""
return self._get_catalog()
@property
def params(self) -> dict[str, Any]:
"""Read-only property referring to Kedro's parameters for this context.
Returns:
Parameters defined in `parameters.yml` with the addition of any
extra parameters passed at initialization.
"""
try:
params = self.config_loader["parameters"]
except MissingConfigException as exc:
warn(f"Parameters not found in your Kedro project config.\n{str(exc)}")
params = {}
if self._extra_params:
# Merge nested structures
params = OmegaConf.merge(params, self._extra_params)
return OmegaConf.to_container(params) if OmegaConf.is_config(params) else params # type: ignore[no-any-return]
def _get_catalog(
self,
save_version: str | None = None,
load_versions: dict[str, str] | None = None,
) -> DataCatalog:
"""A hook for changing the creation of a DataCatalog instance.
Returns:
DataCatalog defined in `catalog.yml`.
Raises:
KedroContextError: Incorrect ``DataCatalog`` registered for the project.
"""
# '**/catalog*' reads modular pipeline configs
conf_catalog = self.config_loader["catalog"]
# turn relative paths in conf_catalog into absolute paths
# before initializing the catalog
conf_catalog = _convert_paths_to_absolute_posix(
project_path=self.project_path, conf_dictionary=conf_catalog
)
conf_creds = self._get_config_credentials()
catalog: DataCatalog = settings.DATA_CATALOG_CLASS.from_config(
catalog=conf_catalog,
credentials=conf_creds,
load_versions=load_versions,
save_version=save_version,
)
feed_dict = self._get_feed_dict()
catalog.add_feed_dict(feed_dict)
_validate_transcoded_datasets(catalog)
self._hook_manager.hook.after_catalog_created(
catalog=catalog,
conf_catalog=conf_catalog,
conf_creds=conf_creds,
feed_dict=feed_dict,
save_version=save_version,
load_versions=load_versions,
)
return catalog
def _get_feed_dict(self) -> dict[str, Any]:
"""Get parameters and return the feed dictionary."""
params = self.params
feed_dict = {"parameters": params}
def _add_param_to_feed_dict(param_name: str, param_value: Any) -> None:
"""This recursively adds parameter paths to the `feed_dict`,
whenever `param_value` is a dictionary itself, so that users can
specify specific nested parameters in their node inputs.
Example:
>>> param_name = "a"
>>> param_value = {"b": 1}
>>> _add_param_to_feed_dict(param_name, param_value)
>>> assert feed_dict["params:a"] == {"b": 1}
>>> assert feed_dict["params:a.b"] == 1
"""
key = f"params:{param_name}"
feed_dict[key] = param_value
if isinstance(param_value, dict):
for key, val in param_value.items():
_add_param_to_feed_dict(f"{param_name}.{key}", val)
for param_name, param_value in params.items():
_add_param_to_feed_dict(param_name, param_value)
return feed_dict
def _get_config_credentials(self) -> dict[str, Any]:
"""Getter for credentials specified in credentials directory."""
try:
conf_creds: dict[str, Any] = self.config_loader["credentials"]
except MissingConfigException as exc:
logging.getLogger(__name__).debug(
"Credentials not found in your Kedro project config.\n %s", str(exc)
)
conf_creds = {}
return conf_creds
[docs]
class KedroContextError(Exception):
"""Error occurred when loading project and running context pipeline."""