"""This module provides context for Kedro project."""
import logging
from copy import deepcopy
from pathlib import Path, PurePosixPath, PureWindowsPath
from typing import Any, Dict, Optional, Union
from urllib.parse import urlparse
from warnings import warn
from pluggy import PluginManager
from kedro.config import ConfigLoader, MissingConfigException
from kedro.framework.project import settings
from kedro.io import DataCatalog
from kedro.pipeline.pipeline import _transcode_split
def _is_relative_path(path_string: str) -> bool:
"""Checks whether a path string is a relative path.
Example:
::
>>> _is_relative_path("data/01_raw") == True
>>> _is_relative_path("logs/info.log") == True
>>> _is_relative_path("/tmp/data/01_raw") == False
>>> _is_relative_path(r"C:\\logs\\info.log") == False
>>> _is_relative_path(r"\\logs\\'info.log") == False
>>> _is_relative_path("c:/logs/info.log") == False
>>> _is_relative_path("s3://logs/info.log") == False
Args:
path_string: The path string to check.
Returns:
Whether the string is a relative path.
"""
# os.path.splitdrive does not reliably work on non-Windows systems
# breaking the coverage, using PureWindowsPath instead
is_full_windows_path_with_drive = bool(PureWindowsPath(path_string).drive)
if is_full_windows_path_with_drive:
return False
is_remote_path = bool(urlparse(path_string).scheme)
if is_remote_path:
return False
is_absolute_path = PurePosixPath(path_string).is_absolute()
if is_absolute_path:
return False
return True
def _convert_paths_to_absolute_posix(
project_path: Path, conf_dictionary: Dict[str, Any]
) -> Dict[str, Any]:
"""Turn all relative paths inside ``conf_dictionary`` into absolute paths by appending them
to ``project_path`` and convert absolute Windows paths to POSIX format. This is a hack to
make sure that we don't have to change user's working directory for logging and datasets to
work. It is important for non-standard workflows such as IPython notebook where users don't go
through `kedro run` or `__main__.py` entrypoints.
Example:
::
>>> conf = _convert_paths_to_absolute_posix(
>>> project_path=Path("/path/to/my/project"),
>>> conf_dictionary={
>>> "handlers": {
>>> "info_file_handler": {
>>> "filename": "logs/info.log"
>>> }
>>> }
>>> }
>>> )
>>> print(conf['handlers']['info_file_handler']['filename'])
"/path/to/my/project/logs/info.log"
Args:
project_path: The root directory to prepend to relative path to make absolute path.
conf_dictionary: The configuration containing paths to expand.
Returns:
A dictionary containing only absolute paths.
Raises:
ValueError: If the provided ``project_path`` is not an absolute path.
"""
if not project_path.is_absolute():
raise ValueError(
f"project_path must be an absolute path. Received: {project_path}"
)
# only check a few conf keys that are known to specify a path string as value
conf_keys_with_filepath = ("filename", "filepath", "path")
for conf_key, conf_value in conf_dictionary.items():
# if the conf_value is another dictionary, absolutify its paths first.
if isinstance(conf_value, dict):
conf_dictionary[conf_key] = _convert_paths_to_absolute_posix(
project_path, conf_value
)
continue
# if the conf_value is not a dictionary nor a string, skip
if not isinstance(conf_value, str):
continue
# if the conf_value is a string but the conf_key isn't one associated with filepath, skip
if conf_key not in conf_keys_with_filepath:
continue
if _is_relative_path(conf_value):
# Absolute local path should be in POSIX format
conf_value_absolute_path = (project_path / conf_value).as_posix()
conf_dictionary[conf_key] = conf_value_absolute_path
elif PureWindowsPath(conf_value).drive:
# Convert absolute Windows path to POSIX format
conf_dictionary[conf_key] = PureWindowsPath(conf_value).as_posix()
return conf_dictionary
def _validate_layers_for_transcoding(catalog: DataCatalog) -> None:
"""Check that transcoded names that correspond to
the same dataset also belong to the same layer.
"""
def _find_conflicts():
base_names_to_layer = {}
for current_layer, dataset_names in catalog.layers.items():
for name in dataset_names:
base_name, _ = _transcode_split(name)
known_layer = base_names_to_layer.setdefault(base_name, current_layer)
if current_layer != known_layer:
yield name
else:
base_names_to_layer[base_name] = current_layer
conflicting_datasets = sorted(_find_conflicts())
if conflicting_datasets:
error_str = ", ".join(conflicting_datasets)
raise ValueError(
f"Transcoded datasets should have the same layer. Mismatch found for: {error_str}"
)
def _update_nested_dict(old_dict: Dict[Any, Any], new_dict: Dict[Any, Any]) -> None:
"""Update a nested dict with values of new_dict.
Args:
old_dict: dict to be updated
new_dict: dict to use for updating old_dict
"""
for key, value in new_dict.items():
if key not in old_dict:
old_dict[key] = value
else:
if isinstance(old_dict[key], dict) and isinstance(value, dict):
_update_nested_dict(old_dict[key], value)
else:
old_dict[key] = value
[docs]class KedroContext:
"""``KedroContext`` is the base class which holds the configuration and
Kedro's main functionality.
"""
[docs] def __init__(
self,
package_name: str,
project_path: Union[Path, str],
config_loader: ConfigLoader,
hook_manager: PluginManager,
env: str = None,
extra_params: Dict[str, Any] = None,
): # pylint: disable=too-many-arguments
"""Create a context object by providing the root of a Kedro project and
the environment configuration subfolders
(see ``kedro.config.ConfigLoader``)
Raises:
KedroContextError: If there is a mismatch
between Kedro project version and package version.
Args:
package_name: Package name for the Kedro project the context is
created for.
project_path: Project path to define the context for.
hook_manager: The ``PluginManager`` to activate hooks, supplied by the session.
env: Optional argument for configuration default environment to be used
for running the pipeline. If not specified, it defaults to "local".
extra_params: Optional dictionary containing extra project parameters.
If specified, will update (and therefore take precedence over)
the parameters retrieved from the project configuration.
"""
self._project_path = Path(project_path).expanduser().resolve()
self._package_name = package_name
self._config_loader = config_loader
self._env = env
self._extra_params = deepcopy(extra_params)
self._hook_manager = hook_manager
@property # type: ignore
def env(self) -> Optional[str]:
"""Property for the current Kedro environment.
Returns:
Name of the current Kedro environment.
"""
return self._env
@property
def project_path(self) -> Path:
"""Read-only property containing Kedro's root project directory.
Returns:
Project directory.
"""
return self._project_path
@property
def catalog(self) -> DataCatalog:
"""Read-only property referring to Kedro's ``DataCatalog`` for this context.
Returns:
DataCatalog defined in `catalog.yml`.
Raises:
KedroContextError: Incorrect ``DataCatalog`` registered for the project.
"""
return self._get_catalog()
@property
def params(self) -> Dict[str, Any]:
"""Read-only property referring to Kedro's parameters for this context.
Returns:
Parameters defined in `parameters.yml` with the addition of any
extra parameters passed at initialization.
"""
try:
params = self.config_loader["parameters"]
except MissingConfigException as exc:
warn(f"Parameters not found in your Kedro project config.\n{str(exc)}")
params = {}
_update_nested_dict(params, self._extra_params or {})
return params
@property
def config_loader(self):
"""Read-only property referring to Kedro's ``ConfigLoader`` for this
context.
Returns:
Instance of `ConfigLoader`.
Raises:
KedroContextError: Incorrect ``ConfigLoader`` registered for the project.
"""
return self._config_loader
def _get_catalog(
self,
save_version: str = None,
load_versions: Dict[str, str] = None,
) -> DataCatalog:
"""A hook for changing the creation of a DataCatalog instance.
Returns:
DataCatalog defined in `catalog.yml`.
Raises:
KedroContextError: Incorrect ``DataCatalog`` registered for the project.
"""
# '**/catalog*' reads modular pipeline configs
conf_catalog = self.config_loader["catalog"]
# turn relative paths in conf_catalog into absolute paths
# before initializing the catalog
conf_catalog = _convert_paths_to_absolute_posix(
project_path=self.project_path, conf_dictionary=conf_catalog
)
conf_creds = self._get_config_credentials()
catalog = settings.DATA_CATALOG_CLASS.from_config(
catalog=conf_catalog,
credentials=conf_creds,
load_versions=load_versions,
save_version=save_version,
)
feed_dict = self._get_feed_dict()
catalog.add_feed_dict(feed_dict)
if catalog.layers:
_validate_layers_for_transcoding(catalog)
self._hook_manager.hook.after_catalog_created(
catalog=catalog,
conf_catalog=conf_catalog,
conf_creds=conf_creds,
feed_dict=feed_dict,
save_version=save_version,
load_versions=load_versions,
)
return catalog
def _get_feed_dict(self) -> Dict[str, Any]:
"""Get parameters and return the feed dictionary."""
params = self.params
feed_dict = {"parameters": params}
def _add_param_to_feed_dict(param_name, param_value):
"""This recursively adds parameter paths to the `feed_dict`,
whenever `param_value` is a dictionary itself, so that users can
specify specific nested parameters in their node inputs.
Example:
>>> param_name = "a"
>>> param_value = {"b": 1}
>>> _add_param_to_feed_dict(param_name, param_value)
>>> assert feed_dict["params:a"] == {"b": 1}
>>> assert feed_dict["params:a.b"] == 1
"""
key = f"params:{param_name}"
feed_dict[key] = param_value
if isinstance(param_value, dict):
for key, val in param_value.items():
_add_param_to_feed_dict(f"{param_name}.{key}", val)
for param_name, param_value in params.items():
_add_param_to_feed_dict(param_name, param_value)
return feed_dict
def _get_config_credentials(self) -> Dict[str, Any]:
"""Getter for credentials specified in credentials directory."""
try:
conf_creds = self.config_loader["credentials"]
except MissingConfigException as exc:
logging.getLogger(__name__).debug(
"Credentials not found in your Kedro project config.\n %s", str(exc)
)
conf_creds = {}
return conf_creds
[docs]class KedroContextError(Exception):
"""Error occurred when loading project and running context pipeline."""