问题:我想从发送JSON响应的分页API中检索数据。使用kedro.extras.datasets.api.APIDataSet
,我可以查询API并检索初始响应。但是,如果每个API请求的结果多于大小限制,则需要遍历JSON响应中的分页链接。有人已经成功地做到了吗?
我应该子类APIDataSet为此,并把链接遍历逻辑在_execute_request()方法?提供的APIDataSet返回请求。响应对象。子类化的APIDataSet应该直接返回(或产生)结果吗?
我尝试了这种方法,它可以检索数据。但我不确定这是否是"kedro方式"。去做。遍历逻辑应该在节点中完成吗?
import copy
from typing import Any, Dict, Iterable, List, Union
import dpath.util
import requests
from kedro.extras.datasets.api import APIDataSet
from kedro.io.core import DataSetError
from requests.auth import AuthBase
class PaginatedJSONAPIDataSet(APIDataSet):
def __init__(
self,
url: str,
method: str = "GET",
data: Any = None,
params: Dict[str, Any] = None,
headers: Dict[str, Any] = None,
auth: Union[Iterable[str], AuthBase] = None,
json: Union[List, Dict[str, Any]] = None,
timeout: int = 60,
credentials: Union[Iterable[str], AuthBase] = None,
items_path: str = None,
next_link_path: str = None, # multiple keys possible to access next link in nested json, separate with "/", like "key1/key2"
):
super().__init__(
url, method, data, params, headers, auth, json, timeout, credentials
)
self.items_path = items_path
self.next_link_path = next_link_path
def _execute_request(self) -> List[Dict[str, Any]]:
# initial request
try:
response = requests.request(**self._request_args)
response.raise_for_status()
except requests.exceptions.HTTPError as exc:
raise DataSetError("Failed to fetch data", exc) from exc
except OSError as exc:
raise DataSetError("Failed to connect to the remote server") from exc
request_args = copy.deepcopy(self._request_args)
request_args.pop("params")
hits = []
# pagination traversal
while True:
hits.extend(dpath.util.get(response.json(), self.items_path))
try:
next_link = dpath.util.get(response.json(), self.next_link_path)
# next link key is not present in json response
except KeyError:
break
# next link key is present, but value is null / None
if next_link is None:
break
request_args["url"] = next_link
response = requests.request(**request_args)
return hits
# toy example with a paginated API, to demonstrate pagination traversal
data_set = PaginatedJSONAPIDataSet(
url="https://pokeapi.co/api/v2/pokemon",
items_path="results",
next_link_path="next",
params={
"limit": 500
}
)
data = data_set.load()
print(type(data)) # <class 'list'>
print(len(data)) # 1126
print(data[0]) # {'name': 'bulbasaur', 'url': 'https://pokeapi.co/api/v2/pokemon/1/'}
有人能给我一个提示,如果他们做了类似的事情或指给我一个最佳实践的例子(我找不到一个)?
你必须定义一个自定义数据集,它应该很容易采用现有的实现和扩展/覆盖来处理分页部分。
我们希望在主要项目中加入PR,因为我认为这对其他用户很有用,令人惊讶的是(据我所知)以前没有出现过。