Source code for kedro.extras.datasets.api.api_dataset

"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/latest/
"""
from typing import Any, Dict, Iterable, List, NoReturn, Union

import requests
from requests.auth import AuthBase

from kedro.io.core import AbstractDataSet, DataSetError

# NOTE: kedro.extras.datasets will be removed in Kedro 0.19.0.
# Any contribution to datasets should be made in kedro-datasets
# in kedro-plugins (https://github.com/kedro-org/kedro-plugins)


[docs]class APIDataSet(AbstractDataSet[None, requests.Response]): """``APIDataSet`` loads the data from HTTP(S) APIs. It uses the python requests library: https://requests.readthedocs.io/en/latest/ Example usage for the `YAML API <https://kedro.readthedocs.io/en/stable/data/\ data_catalog.html#use-the-data-catalog-with-the-yaml-api>`_: .. code-block:: yaml usda: type: api.APIDataSet url: https://quickstats.nass.usda.gov params: key: SOME_TOKEN, format: JSON, commodity_desc: CORN, statisticcat_des: YIELD, agg_level_desc: STATE, year: 2000 Example usage for the `Python API <https://kedro.readthedocs.io/en/stable/data/\ data_catalog.html#use-the-data-catalog-with-the-code-api>`_: :: >>> from kedro.extras.datasets.api import APIDataSet >>> >>> >>> data_set = APIDataSet( >>> url="https://quickstats.nass.usda.gov", >>> params={ >>> "key": "SOME_TOKEN", >>> "format": "JSON", >>> "commodity_desc": "CORN", >>> "statisticcat_des": "YIELD", >>> "agg_level_desc": "STATE", >>> "year": 2000 >>> } >>> ) >>> data = data_set.load() """ # pylint: disable=too-many-arguments
[docs] def __init__( self, url: str, method: str = "GET", data: Any = None, params: Dict[str, Any] = None, headers: Dict[str, Any] = None, auth: Union[Iterable[str], AuthBase] = None, json: Union[List, Dict[str, Any]] = None, timeout: int = 60, credentials: Union[Iterable[str], AuthBase] = None, ) -> None: """Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint. Args: url: The API URL endpoint. method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc... data: The request payload, used for POST, PUT, etc requests https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests params: The url parameters of the API. https://requests.readthedocs.io/en/latest/user/quickstart/#passing-parameters-in-urls headers: The HTTP headers. https://requests.readthedocs.io/en/latest/user/quickstart/#custom-headers auth: Anything ``requests`` accepts. Normally it's either ``('login', 'password')``, or ``AuthBase``, ``HTTPBasicAuth`` instance for more complex cases. Any iterable will be cast to a tuple. json: The request payload, used for POST, PUT, etc requests, passed in to the json kwarg in the requests object. https://requests.readthedocs.io/en/latest/user/quickstart/#more-complicated-post-requests timeout: The wait time in seconds for a response, defaults to 1 minute. https://requests.readthedocs.io/en/latest/user/quickstart/#timeouts credentials: same as ``auth``. Allows specifying ``auth`` secrets in credentials.yml. Raises: ValueError: if both ``credentials`` and ``auth`` are specified. """ super().__init__() if credentials is not None and auth is not None: raise ValueError("Cannot specify both auth and credentials.") auth = credentials or auth if isinstance(auth, Iterable): auth = tuple(auth) self._request_args: Dict[str, Any] = { "url": url, "method": method, "data": data, "params": params, "headers": headers, "auth": auth, "json": json, "timeout": timeout, }
def _describe(self) -> Dict[str, Any]: return dict(**self._request_args) def _execute_request(self) -> requests.Response: try: response = requests.request(**self._request_args) response.raise_for_status() except requests.exceptions.HTTPError as exc: raise DataSetError("Failed to fetch data", exc) from exc except OSError as exc: raise DataSetError("Failed to connect to the remote server") from exc return response def _load(self) -> requests.Response: return self._execute_request() def _save(self, data: None) -> NoReturn: raise DataSetError(f"{self.__class__.__name__} is a read only data set type") def _exists(self) -> bool: response = self._execute_request() return response.ok