将 API 调用应用于大型数据帧时出现问题



对于请求,我调用一个 API,如下所示:

def foo(input):
    payload = {'key': '', 'in': input ,'fj': 'm'}
    r = requests.get('https://api.example.com/api', params=payload)
    res = json.loads(r.input)
    return res

我还有一个像这样的大熊猫数据帧:

    ColA
0   The quick  fox jumps over the lazy 
1   The quick  fox  over the lazy dog
2   The quick brown fox jumps over the lazy dog
....
n   The  brown fox jumps over the  dog
然后我想

将其应用于大型熊猫数据帧,然后我尝试:

df['result'] = df[['ColA']].apply(foo, axis=1)

使用上述方法,它永远不会结束。因此,我尝试了这个:

df['result'] = df['ColA'].apply(foo)

问题是 API 没有收到任何内容,此外,我得到了以下异常:

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

或者我尝试:

docs = df['ColA'].values.tolist()
list(map(foo, docs))

我仍然有同样的问题。关于如何有效地将熊猫列传递给 api 的任何想法?

更新

尝试使用多处理后,我注意到我有一个JSONDecodeError: Expecting value: line 1 column 1 (cchar 0)错误。因此,我想这种情况与缓存问题有关,所以我的问题是,如果这与缓存有关,我该如何解决这个问题?

更新 2

---------------------------------------------------------------------------
RemoteTraceback                           Traceback (most recent call last)
RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/Cellar/python3/3.5.2_2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/local/Cellar/python3/3.5.2_2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "<ipython-input-3-7d058c7b9ac1>", line 9, in get_data
    data = json.loads(r.text)
  File "/usr/local/Cellar/python3/3.5.2_2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/json/__init__.py", line 319, in loads
    return _default_decoder.decode(s)
  File "/usr/local/Cellar/python3/3.5.2_2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/Cellar/python3/3.5.2_2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
"""
The above exception was the direct cause of the following exception:
JSONDecodeError                           Traceback (most recent call last)
<ipython-input-11-6bb417b3ed92> in <module>()
      3 p = Pool(5)
      4 # get data/response only for _unique_ strings (parameters)
----> 5 rslt = pd.Series(p.map(get_data, df2['sents'].unique().tolist()),index=df['sents'].unique())
      6 # map responses back to DF (it'll take care of duplicates)
      7 df['new'] = df2['ColA'].map(rslt)
/usr/local/Cellar/python3/3.5.2_2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/pool.py in map(self, func, iterable, chunksize)
    258         in a list that is returned.
    259         '''
--> 260         return self._map_async(func, iterable, mapstar, chunksize).get()
    261 
    262     def starmap(self, func, iterable, chunksize=None):
/usr/local/Cellar/python3/3.5.2_2/Frameworks/Python.framework/Versions/3.5/lib/python3.5/multiprocessing/pool.py in get(self, timeout)
    606             return self._value
    607         else:
--> 608             raise self._value
    609 
    610     def _set(self, i, obj):
JSONDecodeError: Expecting value: line 1 column 1 (char 0)

受到@GauthierFeuillen答案的启发,我想对其进行调整,使其对熊猫更友好:

import pandas as pd
from multiprocessing import Pool
import requests
url='https://api.example.com/api'
df = pd.read_csv("data.csv")
def get_data(text, url=url):
    r = requests.get(url,
                     params={'key': '<YOUR KEY>',
                             'in': text
                             'fj': 'm'})
    if r.status_code != requests.codes.ok:
        return np.nan
    return r.text
if __name__ == '__main__':
    p = Pool(5)
    # get data/response only for _unique_ strings (parameters)
    rslt = pd.Series(p.map(get_data, df['ColA'].unique().tolist()),
                     index=df['ColA'].unique())
    # map responses back to DF (it'll take care of duplicates)
    df['new'] = df['ColA'].map(rslt)

这应该符合您的需求:

import pandas as pd
from multiprocessing import Pool
import requests
df = pd.read_csv("data.csv")
def getLink(link):
    return requests.get(link).text
if __name__ == '__main__':
    p = Pool(5)
    print (p.map(getLink, df["link"]))

只需根据需要进行更改(此处我仅从 url 中获取文本)。但真正的想法是使用多处理包来并行化工作:)

最新更新