我需要使用池来异步解析来自提取方法的结果,并将这些结果发送到写入队列。
我已经尝试过这个:但它似乎只是迭代运行......一个接一个的过程。
process_pool = Pool(processes=30, maxtasksperchild=1)
while True:
filepath = read_queue.get(True)
if filepath is None:
break
res = process_pool.apply_async(func=process.run, args=(filepath, final_path), callback=write_queue.put)
results.append(res)
for result in results:
result.wait()
process_pool.close()
process_pool.join()
我也尝试过等待每个结果,但这与上述相同:
process_pool = Pool(processes=30, maxtasksperchild=1)
while True:
filepath = read_queue.get(True)
if filepath is None:
break
res = process_pool.apply_async(func=process.run, args=(filepath, final_path), callback=write_queue.put)
res.wait()
process_pool.close()
process_pool.join()
我还尝试只调度进程,如果池没有工作人员生成,则让池自行阻塞:
process_pool = Pool(processes=30, maxtasksperchild=1)
while True:
filepath = read_queue.get(True)
if filepath is None:
break
process_pool.apply_async(func=process.run, args=(filepath, final_path), callback=write_queue.put)
process_pool.close()
process_pool.join()
这是行不通的,只是一遍又一遍地运行流程,实际上并没有运行任何类型的功能,我不确定为什么。看来我必须对游泳池AsyncResult
做一些事情才能实际安排该过程。
我需要它像这样工作:
- 当队列中有结果在等待时,使用队列中的特定参数在池中生成一个新进程。
- 在回调时,将该处理后的结果放入写入队列中。
但是,我似乎无法让它正常工作。它只会迭代工作,因为我必须对结果做一些事情才能真正让任务正确安排。不管那是一个.get
,.wait
,随便什么。
# write.py
def write(p_list):
outfile = Path('outfile.txt.bz2')
for data in p_list:
if Path.exists(outfile):
mode = 'ab'
else:
mode = 'wb'
with bz2.open(filename=outfile, mode=mode, compresslevel=9) as output:
temp = (str(data) + 'n').encode('utf-8')
output.write(temp)
print('JSON files written', flush=True)
class Write(Process):
def __init__(self, write_queue: Queue):
Process.__init__(self)
self.write_queue = write_queue
def run(self):
while True:
try:
p_list = self.write_queue.get(True, 900)
except Empty:
continue
if p_list is None:
break
write(p_list)
-
# process.py
def parse(data: int):
global json_list
time.sleep(.1) # simulate parsing the json
json_list.append(data)
def read(data: int):
time.sleep(.1)
parse(data)
def run(data: int):
global json_list
json_list = []
read(data)
return json_list
if __name__ == '__main__':
global output_path, json_list
-
# main.py
if __name__ == '__main__':
read_queue = Queue()
write_queue = Queue()
write = Write(write_queue=write_queue)
write.daemon = True
write.start()
for i in range(0, 1000000):
read_queue.put(i)
read_queue.put(None)
process_pool = Pool(processes=30, maxtasksperchild=1)
while True:
data = read_queue.get(True)
if data is None:
break
res = process_pool.apply_async(func=process.run, args=(data,), callback=write_queue.put)
write_queue.put(None)
process_pool.close()
process_pool.join()
write.join()
print('process done')
所以,问题是没有问题。我只是愚蠢。如果您将每个工作人员的最大任务数定义为 1,则流程将很快安排,并且看起来什么都没有发生(或者我是唯一一个这么认为的人(。
以下是在while
循环中正确使用异步进程池的合理方法,maxtasksperchild
1
if __name__ == '__main__':
def func(elem):
time.sleep(0.5)
return elem
def callback(elem):
# do something with processed data
pass
queue = multiprocessing.Queue()
for i in range(0, 10000):
queue.put(i)
process_pool = multiprocessing.Pool(processes=num_processes, maxtasksperchild=1)
results = []
while True:
data = queue.get(True)
if data is None:
break
res = process_pool.apply_async(func=func, args=(data,), callback=callback)
results.append(res)
flag = False
for i, res in enumerate(results):
try:
res.wait(600)
# do some logging
results[i] = None
except TimeoutError:
flag = True
# do some logging
process_pool.close()
if flag:
process_pool.terminate()
process_pool.join()
# done!