所以我得到了一段这样的代码:
mgr = MP.Manager()
mp_dataset = mgr.dict(dataset)
mp_seen = mgr.dict({k: None for k in seen})
mp_barrier = MP.Barrier(WORKER_COUNT + 1) # +1 to include main process
# TileQueue is a global var
workers = [
MP.Process(target=process_item_worker, args=(TileQueue, mp_dataset, mp_seen, mp_barrier))
for _ in range(0, WORKER_COUNT)
]
[worker.start() for worker in workers]
print("Waiting for workers...")
mp_barrier.wait()
start_t = time.monotonic()
try:
asyncio.run(fetch_more_data())
elapsed_t = time.monotonic() - start_t
print(f"nFetching finished in {elapsed_t:,.2f} seconds", flush=True)
except Exception as e:
print(f"nAn Exception happened: {e}")
finally:
# Save the results first, convert from managed to normal dicts
dataset.update(mp_dataset)
progress["seen"] = dict(mp_seen)
with PROGRESS_FILE.open("wb") as fout:
pickle.dump(progress, fout)
# Then we tell workers to disband
[TileQueue.put(None) for _ in workers]
print("Waiting for workers...", flush=True)
for w in workers:
w.join()
TileQueue.close()
print("Start processing updated dataset")
为什么要将异步和多处理结合起来?因为fetch_more_data
逻辑是I/O绑定的,所以异步在那里工作得很好,而process_item
是严重的CPU绑定,所以我想专用进程来做繁重的工作。
问题:
I总是在最后一行print()
之前多次获得消息GetOverlappedResult got err 109
(始终等于WORKER_COUNT
(。
不过,一切都如预期。但这条信息让我很恼火。
可能是什么问题?
好吧,所以在做了很多实验后,我发现了(可能的(原因:
我还必须"结束"
Manager()
实例
所以我把finally
块改成这样:
finally:
# Save the results first, convert from managed to normal dicts
dataset.update(mp_dataset)
progress["seen"] = dict(mp_seen)
with PROGRESS_FILE.open("wb") as fout:
pickle.dump(progress, fout)
mgr.shutdown()
mgr.join()
# Then we tell workers to disband
[TileQueue.put(None) for _ in workers]
time.sleep(1.0)
TileQueue.close()
print("Waiting for workers...", flush=True)
for w in workers:
w.join()
现在我不再得到GetOverlappedResult got err 109
,我很高兴:-(