在多个不同的辅助角色之间共享排队的工作负载



关于

我有一个类DataRetriever需要使用 API 凭据进行实例化。我有五组不同的 API 凭据,因此我想实例化DataRetriever的五个实例。DataRetriever只有一个公共方法retrieve顾名思义,该方法将使用基于传递给该方法idsubprocess检索一些数据。

  • 给定的 API 凭据不能同时打开多个流(具有任何 ID(
  • 一个DataRetriever最多只能有一个与 API 的连接,因此不得在仍在检索数据流的DataRetriever实例上调用DataRetriever#retrieve(id)
  • 数据量各不相同,因此子进程退出的时间可以是几秒钟到几分钟之间的任何时间

当前的做法

我正在使用示例代码段中所示的queue。我用需要检索的所有数据流id填充队列。

def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()

问题

我总是可以使用观察者模式,但我想知道是否有 Python 方法来做这样的事情?

  • 如何确保上述代码片段中的worker将排队的工作负载分配给仅空闲DataRetriever,同时无缝使用DataRetriever的所有五个实例?
  • 在研究时,我发现ProcessPoolExecutor无法使示例适应我的场景。这可能是解决方案吗?

您可以执行以下操作:

def worker(q_request, q_response, api_cred):
dr = DataRetriever(api_cred)
while True:
stream_id = q_request.get() # that's blocking unless q.get(False)
if stream_id == "stop":
sys.exit(0)
dr.retrieve(stream_id) # that can take some time (assume blocking)
q_response.put(stream_id) # signal job has ended to parent process
api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
t.start()
threads.append(t)
for item in source():
q_request.put(item)
print("Stream ID %s was successfully retrieved." %q_response.get())

这假设dr.retrieve(stream_id)正在阻塞,或者您有某种方法知道由dr.retrieve(stream_id)启动的子进程尚未完成,因此您的工作人员将阻塞直到完成(否则DataRetriever的实现必须更改(。

默认情况下,q.get()是阻塞的,因此您的worker进程将与其他进程一起等待对象来获取它。Queue()对象也是FIFO,因此您可以确保工作将在worker流程之间均匀分配。

最新更新