Betfairlightweight API: https://github.com/betcode-org/betfair
要使用该模块,必须传递apicclient数据和登录:
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
为了加快数据收集过程,我使用multiprocessing:
from multiprocessing import Pool
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
def main():
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
这样就进行了12次登录。对于访问API,这不是理想的方式。
为什么有12个登录?
当我激活代码时,它生成1个登录,当创建多处理池时,它生成11个登录,每个进程一个。如果我把print(trading)
正好放在trading.login()
下面,那么当代码开始运行时,终端上会出现一条print语句,然后在创建池时同时出现另外11条print语句。
所以我需要找到一种方法来做同样的服务,只使用一个登录。
我试图抛出登录在main()
和添加作为一个参数来调用函数:
from multiprocessing import Pool
from itertools import repeat
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, zip(repeat(trading),matches_bf.iterrows()))
finally:
pool.close()
pool.join()
trading.logout()
def data_event(trading,event_bf):
trading = trading
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
但是遇到的错误是:
TypeError: cannot pickle 'module' object
我尝试将trading
放入函数data_event
:
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
list_pool = pool.map(data_event, matches_bf.iterrows())
finally:
pool.close()
pool.join()
trading.logout()
def data_event(event_bf):
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
_, event_bf = event_bf
event_id = event_bf['event_id']
filter_catalog_markets = betfairlightweight.filters.market_filter(
event_ids=[event_id],
market_type_codes = [
'MATCH_ODDS'
]
)
catalog_markets = trading.betting.list_market_catalogue(
filter=filter_catalog_markets,
max_results='100',
sort='FIRST_TO_START',
market_projection=['RUNNER_METADATA']
)
... # some more code
... # some more code
... # some more code
但是遇到的错误是:
errorCode': 'INVALID_SESSION_INFORMATION'
原因是逻辑的:multiprocessing没有登录。
我应该如何继续,以便我只使用一个登录,可以做我需要的一切,而不必被迫一个接一个地工作(行逐行,没有多处理需要太长时间,不可行)?
附加信息:
- betfair轻量级登录,如果它有助于理解的情况:
设置session
为可pickle的betfairlightweight.APIClient
实例
trading = betfairlightweight.APIClient(
username,
pw,
app_key=app_key,
cert_files=('blablabla.crt','blablabla.key'),
session=requests.Session(), # Add this
)
<标题>TypeError: cannot pickle 'module' object
APIClient
(BaseClient
)默认self.session
为requests
模块。
class APIClient(BaseClient):
...
class BaseClient:
...
def __init__(
self,
username: str,
password: str = None,
app_key: str = None,
certs: str = None,
locale: str = None,
cert_files: Union[Tuple[str], str, None] = None,
lightweight: bool = False,
session: requests.Session = None,
):
...
self.session = session if session else requests
...
标题>每个线程都在执行login
操作,因此将登录逻辑放在通过if __name__ == ...
调用的函数中应该通过屏蔽线程来解决这个问题。
from multiprocessing import Pool
def main():
trading = betfairlightweight.APIClient(username, pw, app_key=app_key, cert_files=('blablabla.crt','blablabla.key'))
trading.login()
trading.keep_alive()
matches_bf = # DataFrame...
try:
max_process = multiprocessing.cpu_count()-1 or 1
pool = multiprocessing.Pool(max_process)
m = multiprocessing.Manager()
queue = m.Queue()
queue.put(trading)
list_pool = pool.starmap(data_event, [(queue, row) for row in matches_bf.iterrows()])
finally:
pool.close()
pool.join()
trading.logout()
def data_event(queue, event_bf):
trading = queue.get()
_, event_bf = event_bf
... # some more code
queue.put(trading)
if __name__ == "main":
main()
编辑
这里的主要问题是,trading
对象(基本上是交易库返回的API客户端)无法序列化,因此multiprocessing
无法pickle它并将其发送给进程。在我看来,没有"直接"解决你的问题;但是,您可以尝试以下两种解决方法:
- 尝试使用pathos代替多处理。multiprocessing,使用
dill
代替pickle
。 - 您可以使用
multiprocessing.pool.ThreadPool
代替multiprocessing.pool.Pool
。由于它与主线程共享内存,因此每个子进程都不需要创建新的交易对象。与Pool
相比,这个可能会对性能造成影响。