"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/master/
"""
from typing import Any, Dict, Iterable, List, Union
import requests
from requests.auth import AuthBase
from kedro.io.core import AbstractDataSet, DataSetError
[docs]class APIDataSet(AbstractDataSet):
"""``APIDataSet`` loads the data from HTTP(S) APIs.
It uses the python requests library: https://requests.readthedocs.io/en/master/
Example:
::
>>> from kedro.extras.datasets.api import APIDataSet
>>>
>>>
>>> data_set = APIDataSet(
>>> url="https://quickstats.nass.usda.gov",
>>> params={
>>> "key": "SOME_TOKEN",
>>> "format": "JSON",
>>> "commodity_desc": "CORN",
>>> "statisticcat_des": "YIELD",
>>> "agg_level_desc": "STATE",
>>> "year": 2000
>>> }
>>> )
>>> data = data_set.load()
"""
# pylint: disable=too-many-arguments
[docs] def __init__(
self,
url: str,
method: str = "GET",
data: Any = None,
params: Dict[str, Any] = None,
headers: Dict[str, Any] = None,
auth: Union[Iterable[str], AuthBase] = None,
json: Union[List, Dict[str, Any]] = None,
timeout: int = 60,
credentials: Union[Iterable[str], AuthBase] = None,
) -> None:
"""Creates a new instance of ``APIDataSet`` to fetch data from an API endpoint.
Args:
url: The API URL endpoint.
method: The Method of the request, GET, POST, PUT, DELETE, HEAD, etc...
data: The request payload, used for POST, PUT, etc requests
https://requests.readthedocs.io/en/master/user/quickstart/#more-complicated-post-requests
params: The url parameters of the API.
https://requests.readthedocs.io/en/master/user/quickstart/#passing-parameters-in-urls
headers: The HTTP headers.
https://requests.readthedocs.io/en/master/user/quickstart/#custom-headers
auth: Anything ``requests`` accepts. Normally it's either ``('login', 'password')``,
or ``AuthBase``, ``HTTPBasicAuth`` instance for more complex cases. Any
iterable will be cast to a tuple.
json: The request payload, used for POST, PUT, etc requests, passed in
to the json kwarg in the requests object.
https://requests.readthedocs.io/en/master/user/quickstart/#more-complicated-post-requests
timeout: The wait time in seconds for a response, defaults to 1 minute.
https://requests.readthedocs.io/en/master/user/quickstart/#timeouts
credentials: same as ``auth``. Allows specifying ``auth`` secrets in
credentials.yml.
Raises:
ValueError: if both ``credentials`` and ``auth`` are specified.
"""
super().__init__()
if credentials is not None and auth is not None:
raise ValueError("Cannot specify both auth and credentials.")
auth = credentials or auth
if isinstance(auth, Iterable):
auth = tuple(auth)
self._request_args: Dict[str, Any] = {
"url": url,
"method": method,
"data": data,
"params": params,
"headers": headers,
"auth": auth,
"json": json,
"timeout": timeout,
}
def _describe(self) -> Dict[str, Any]:
return dict(**self._request_args)
def _execute_request(self) -> requests.Response:
try:
response = requests.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to fetch data", exc) from exc
except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
return response
def _load(self) -> requests.Response:
return self._execute_request()
def _save(self, data: Any) -> None:
raise DataSetError(f"{self.__class__.__name__} is a read only data set type")
def _exists(self) -> bool:
response = self._execute_request()
return response.ok