尝试将数据从apicclient发送到使用多处理的函数时出错



Betfairlightweight API: https://github.com/betcode-org/betfair

要使用该模块,必须传递apicclient数据和登录:

trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()

为了加快数据收集过程,我使用multiprocessing:

from multiprocessing import Pool
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
def main():
    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        list_pool = pool.map(data_event, matches_bf.iterrows())
    finally:
        pool.close()
        pool.join()
    trading.logout()
def data_event(event_bf):
    _, event_bf = event_bf
    event_id = event_bf['event_id']
    filter_catalog_markets = betfairlightweight.filters.market_filter(
        event_ids=[event_id],
        market_type_codes = [
            'MATCH_ODDS'
            ]
        )
    catalog_markets = trading.betting.list_market_catalogue(
        filter=filter_catalog_markets,
        max_results='100',
        sort='FIRST_TO_START',
        market_projection=['RUNNER_METADATA']
    )
     ... # some more code
     ... # some more code
     ... # some more code

这样就进行了12次登录。对于访问API,这不是理想的方式。

为什么有12个登录?

当我激活代码时,它生成1个登录,当创建多处理池时,它生成11个登录,每个进程一个。如果我把print(trading)正好放在trading.login()下面,那么当代码开始运行时,终端上会出现一条print语句,然后在创建池时同时出现另外11条print语句。

所以我需要找到一种方法来做同样的服务,只使用一个登录

我试图抛出登录main()和添加作为一个参数来调用函数:

from multiprocessing import Pool
from itertools import repeat
def main():
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    trading.login()
    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        list_pool = pool.map(data_event, zip(repeat(trading),matches_bf.iterrows()))
    finally:
        pool.close()
        pool.join()
    trading.logout()
def data_event(trading,event_bf):
    trading = trading
    _, event_bf = event_bf
    event_id = event_bf['event_id']
    filter_catalog_markets = betfairlightweight.filters.market_filter(
        event_ids=[event_id],
        market_type_codes = [
            'MATCH_ODDS'
            ]
        )
    catalog_markets = trading.betting.list_market_catalogue(
        filter=filter_catalog_markets,
        max_results='100',
        sort='FIRST_TO_START',
        market_projection=['RUNNER_METADATA']
    )
     ... # some more code
     ... # some more code
     ... # some more code

但是遇到的错误是:

TypeError: cannot pickle 'module' object

我尝试将trading放入函数data_event:

from multiprocessing import Pool
def main():
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    trading.login()
    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        list_pool = pool.map(data_event, matches_bf.iterrows())
    finally:
        pool.close()
        pool.join()
    trading.logout()
def data_event(event_bf):
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    _, event_bf = event_bf
    event_id = event_bf['event_id']
    filter_catalog_markets = betfairlightweight.filters.market_filter(
        event_ids=[event_id],
        market_type_codes = [
            'MATCH_ODDS'
            ]
        )
    catalog_markets = trading.betting.list_market_catalogue(
        filter=filter_catalog_markets,
        max_results='100',
        sort='FIRST_TO_START',
        market_projection=['RUNNER_METADATA']
    )
     ... # some more code
     ... # some more code
     ... # some more code

但是遇到的错误是:

errorCode': 'INVALID_SESSION_INFORMATION'

原因是逻辑的:multiprocessing没有登录。

我应该如何继续,以便我只使用一个登录,可以做我需要的一切,而不必被迫一个接一个地工作(行逐行,没有多处理需要太长时间,不可行)?

附加信息:

  • betfair轻量级登录,如果它有助于理解的情况:

设置session为可pickle的betfairlightweight.APIClient实例

trading = betfairlightweight.APIClient(
    username,
    pw,
    app_key=app_key,
    cert_files=('blablabla.crt','blablabla.key'),
    session=requests.Session(),  # Add this
)
<标题>

TypeError: cannot pickle 'module' object

APIClient (BaseClient)默认self.sessionrequests模块。

class APIClient(BaseClient):
    ...
class BaseClient:
    ...
    def __init__(
        self,
        username: str,
        password: str = None,
        app_key: str = None,
        certs: str = None,
        locale: str = None,
        cert_files: Union[Tuple[str], str, None] = None,
        lightweight: bool = False,
        session: requests.Session = None,
    ):
        ...
        self.session = session if session else requests
        ...

每个线程都在执行login操作,因此将登录逻辑放在通过if __name__ == ...调用的函数中应该通过屏蔽线程来解决这个问题。

from multiprocessing import Pool
def main():
    trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
    trading.login()
    trading.keep_alive()
    matches_bf = # DataFrame...
    try:
        max_process = multiprocessing.cpu_count()-1 or 1
        pool = multiprocessing.Pool(max_process)
        m = multiprocessing.Manager()
        queue = m.Queue()
        queue.put(trading)
        list_pool = pool.starmap(data_event, [(queue, row) for row in matches_bf.iterrows()])
    finally:
        pool.close()
        pool.join()
    trading.logout()
def data_event(queue, event_bf):
    trading = queue.get()
    _, event_bf = event_bf
     ... # some more code
    queue.put(trading)
if __name__ == "main":
    main()

编辑

这里的主要问题是,trading对象(基本上是交易库返回的API客户端)无法序列化,因此multiprocessing无法pickle它并将其发送给进程。在我看来,没有"直接"解决你的问题;但是,您可以尝试以下两种解决方法:

  1. 尝试使用pathos代替多处理。multiprocessing,使用dill代替pickle
  2. 您可以使用multiprocessing.pool.ThreadPool代替multiprocessing.pool.Pool。由于它与主线程共享内存,因此每个子进程都不需要创建新的交易对象。与Pool相比,这个可能会对性能造成影响。

最新更新