代码:
url = 'ws://xx.xx.xx.xx:1234'
ws = create_connection(url)
ws.send(json.dumps(subscribe_msg))
ws.recv()
while True:
result = ws.recv()
# handle the result using a different core each time
handle_parallely(result)
while循环result=ws.recv()
需要并发,这样就可以重复调用ws.recv
,而无需等待handle_parallely
返回。handle_parallely
在被调用时需要并行运行。
接收到的数据及其处理独立于任何以前或将来的数据。
您可以使用并发期货模块中的ProcessPoolExecutor。这可能看起来像
from concurrent.futures import ProcessPoolExecutor
max_number_of_processes = 4 # just put any number here
futures = []
with ProcessPoolExecutor(max_worker=max_number_of_processes) as executor:
while True:
result = ws.recv()
# handle the result using a different core each time
future = executor.submit(handle_parallely, result)
futures.append(future)
futures = [f for f in futures if not f.done()]
当然,只有当result和handle_parallely是可拾取的时,这才有效。如果您遇到PickelingError问题,请参阅下面的内容,默认情况下哪些类型是可拾取。
将未来存储在该列表中是可选的,但也许您希望跟踪对它们的引用。