关于
我有一个类DataRetriever
需要使用 API 凭据进行实例化。我有五组不同的 API 凭据,因此我想实例化DataRetriever
的五个实例。DataRetriever
只有一个公共方法retrieve
顾名思义,该方法将使用基于传递给该方法id
的subprocess
检索一些数据。
- 给定的 API 凭据不能同时打开多个流(具有任何 ID(
- 一个
DataRetriever
最多只能有一个与 API 的连接,因此不得在仍在检索数据流的DataRetriever
实例上调用DataRetriever#retrieve(id)
- 数据量各不相同,因此子进程退出的时间可以是几秒钟到几分钟之间的任何时间
当前的做法
我正在使用示例代码段中所示的queue
。我用需要检索的所有数据流id
填充队列。
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
q = queue.Queue()
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
for item in source():
q.put(item)
# block until all tasks are done
q.join()
# stop workers
for i in range(num_worker_threads):
q.put(None)
for t in threads:
t.join()
问题
我总是可以使用观察者模式,但我想知道是否有 Python 方法来做这样的事情?
- 如何确保上述代码片段中的
worker
将排队的工作负载分配给仅空闲DataRetriever
,同时无缝使用DataRetriever
的所有五个实例? - 在研究时,我发现
ProcessPoolExecutor
无法使示例适应我的场景。这可能是解决方案吗?
您可以执行以下操作:
def worker(q_request, q_response, api_cred):
dr = DataRetriever(api_cred)
while True:
stream_id = q_request.get() # that's blocking unless q.get(False)
if stream_id == "stop":
sys.exit(0)
dr.retrieve(stream_id) # that can take some time (assume blocking)
q_response.put(stream_id) # signal job has ended to parent process
api_cred = [cred1, cred2, cred3, cred4, cred5]
q_request, q_response = queue.Queue(), queue.Queue()
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(q_request, q_response, api_cred[i]))
t.start()
threads.append(t)
for item in source():
q_request.put(item)
print("Stream ID %s was successfully retrieved." %q_response.get())
这假设dr.retrieve(stream_id)
正在阻塞,或者您有某种方法知道由dr.retrieve(stream_id)
启动的子进程尚未完成,因此您的工作人员将阻塞直到完成(否则DataRetriever
的实现必须更改(。
默认情况下,q.get()
是阻塞的,因此您的worker
进程将与其他进程一起等待对象来获取它。Queue()
对象也是FIFO,因此您可以确保工作将在worker
流程之间均匀分配。