Source code for kedro.extras.datasets.api.api_dataset

"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/master/
"""
from typing import Any, Dict, Iterable, List, Union

import requests
from requests.auth import AuthBase

from kedro.io.core import AbstractDataSet, DataSetError


[docs]class APIDataSet(AbstractDataSet): """``APIDataSet`` loads the data from HTTP(S) APIs. It uses the python requests library: https://requests.readthedocs.io/en/master/ Example: :: >>> from kedro.extras.datasets.api import APIDataSet >>> >>> >>> data_set = APIDataSet( >>> url="https://quickstats.nass.usda.gov", >>> params={ >>> "key": "SOME_TOKEN", >>> "format": "JSON", >>> "commodity_desc": "CORN", >>> "statisticcat_des": "YIELD", >>> "agg_level_desc": "STATE", >>> "year": 2000 >>> } >>> ) >>> data = data_set.load() """ # pylint: disable=too-many-arguments
[docs] def __init__( self, url: str, method: str = "GET", data: Any = None, params: Dict[str, Any] = None, headers: Dict[str, Any] = None, auth: Union[Iterable[str], AuthBase] = None, json: Union[List, Dict[str, Any]] = None, timeout: int = 60, credentials: Union[Iterable[str], AuthBase] = None, ) -> None: """Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint. Args: url: The API URL endpoint. method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc... data: The request payload, used for POST, PUT, etc requests https://requests.readthedocs.io/en/master/user/quickstart/#more-complicated-post-requests params: The url parameters of the API. https://requests.readthedocs.io/en/master/user/quickstart/#passing-parameters-in-urls headers: The HTTP headers. https://requests.readthedocs.io/en/master/user/quickstart/#custom-headers auth: Anything ``requests`` accepts. Normally it's either ``('login', 'password')``, or ``AuthBase``, ``HTTPBasicAuth`` instance for more complex cases. Any iterable will be cast to a tuple. json: The request payload, used for POST, PUT, etc requests, passed in to the json kwarg in the requests object. https://requests.readthedocs.io/en/master/user/quickstart/#more-complicated-post-requests timeout: The wait time in seconds for a response, defaults to 1 minute. https://requests.readthedocs.io/en/master/user/quickstart/#timeouts credentials: same as ``auth``. Allows specifying ``auth`` secrets in credentials.yml. Raises: ValueError: if both ``credentials`` and ``auth`` are specified. """ super().__init__() if credentials is not None and auth is not None: raise ValueError("Cannot specify both auth and credentials.") auth = credentials or auth if isinstance(auth, Iterable): auth = tuple(auth) self._request_args: Dict[str, Any] = { "url": url, "method": method, "data": data, "params": params, "headers": headers, "auth": auth, "json": json, "timeout": timeout, }
def _describe(self) -> Dict[str, Any]: return dict(**self._request_args) def _execute_request(self) -> requests.Response: try: response = requests.request(**self._request_args) response.raise_for_status() except requests.exceptions.HTTPError as exc: raise DataSetError("Failed to fetch data", exc) from exc except OSError as exc: raise DataSetError("Failed to connect to the remote server") from exc return response def _load(self) -> requests.Response: return self._execute_request() def _save(self, data: Any) -> None: raise DataSetError(f"{self.__class__.__name__} is a read only data set type") def _exists(self) -> bool: response = self._execute_request() return response.ok