为什么我一直得到GetOverlappedResult得到错误109



所以我得到了一段这样的代码:

mgr = MP.Manager()
mp_dataset = mgr.dict(dataset)
mp_seen = mgr.dict({k: None for k in seen})
mp_barrier = MP.Barrier(WORKER_COUNT + 1)  # +1 to include main process
# TileQueue is a global var
workers = [
MP.Process(target=process_item_worker, args=(TileQueue, mp_dataset, mp_seen, mp_barrier))
for _ in range(0, WORKER_COUNT)
]
[worker.start() for worker in workers]
print("Waiting for workers...")
mp_barrier.wait()
start_t = time.monotonic()
try:
asyncio.run(fetch_more_data())
elapsed_t = time.monotonic() - start_t
print(f"nFetching finished in {elapsed_t:,.2f} seconds", flush=True)
except Exception as e:
print(f"nAn Exception happened: {e}")
finally:
# Save the results first, convert from managed to normal dicts
dataset.update(mp_dataset)
progress["seen"] = dict(mp_seen)
with PROGRESS_FILE.open("wb") as fout:
pickle.dump(progress, fout)
# Then we tell workers to disband
[TileQueue.put(None) for _ in workers]
print("Waiting for workers...", flush=True)
for w in workers:
w.join()
TileQueue.close()
print("Start processing updated dataset")

为什么要将异步和多处理结合起来?因为fetch_more_data逻辑是I/O绑定的,所以异步在那里工作得很好,而process_item是严重的CPU绑定,所以我想专用进程来做繁重的工作。

问题:

I总是在最后一行print()之前多次获得消息GetOverlappedResult got err 109(始终等于WORKER_COUNT(。

不过,一切都如预期。但这条信息让我很恼火。

可能是什么问题?

好吧,所以在做了很多实验后,我发现了(可能的(原因:

我还必须"结束"Manager()实例

所以我把finally块改成这样:

finally:
# Save the results first, convert from managed to normal dicts
dataset.update(mp_dataset)
progress["seen"] = dict(mp_seen)
with PROGRESS_FILE.open("wb") as fout:
pickle.dump(progress, fout)
mgr.shutdown()
mgr.join()
# Then we tell workers to disband
[TileQueue.put(None) for _ in workers]
time.sleep(1.0)
TileQueue.close()
print("Waiting for workers...", flush=True)
for w in workers:
w.join()

现在我不再得到GetOverlappedResult got err 109,我很高兴:-(

相关内容

  • 没有找到相关文章

最新更新