如何使用Python多处理池永久地从队列中消费项目



我正在尝试创建一个监听http请求并将作业id添加到队列的工作人员。我使用Python的内置多处理模块。

我需要一个有几个进程的池,这些进程将处理来自队列和重生的作业。进程必须重新启动,因为在某些情况下,作业处理可能会导致内存泄漏。池应该永远运行,因为项目将被动态地添加到队列中。

问题是我的池不重生工人后,他们完成。

我如何使用pool来实现这一点?我希望它永远运行,从队列中消费项目,并在每个任务后重生子。

from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from multiprocessing import Pool, SimpleQueue, current_process
queue = SimpleQueue()
def do_something(q):
worker_id = current_process().pid
print(f"Worker {worker_id} spawned")
item_id = q.get()
print(f"Worker {worker_id} received id: {item_id}")
# long_term_operation_that_leaks_memory(item_id)
# print(f"Worker {worker_id} completed id: {item_id}")
def main():
with Pool(
processes=2, initializer=do_something, initargs=(queue,), maxtasksperchild=1
):
queue.put("a")
queue.put("b")
queue.put("c")
server_address = ("", 8000)
httpd = ThreadingHTTPServer(server_address, BaseHTTPRequestHandler)
try:
httpd.serve_forever()
except (KeyboardInterrupt, SystemExit):
pass
if __name__ == "__main__":
main()

我尝试了initializermaxtasksperchild,但它不起作用。

我知道我可以使用map向池中添加新进程,但我没有未来无限可能任务的映射。我认为initializer应该负责所有的新任务。但我不知道如何强迫它永远运行和重生。

在我的代码示例"c"项永远不会被处理。因此,如果我添加http逻辑来放置更多的项目,它也不会起作用。将http逻辑添加到此代码中不是我问题的必要部分,但欢迎任何提示。

谢谢!

编辑:

在这种情况下,我决定使用Pool的原因是官方文档说:

一个池内的工作进程通常活到完成池的工作队列的持续时间。在其他方面发现的频繁模式系统(如Apache, mod_wsgi等)来释放资源Workers是允许池内的一个worker只完成一个集合在退出之前的工作量,被清理和一个新的生成进程以替换旧进程。maxtasksperchild参数to the Pool向最终用户公开此功能。

我的目标:

  • 项目将通过http请求动态添加到队列
  • 池将永远存在
  • 工作进程将只执行队列中的一个任务,并将重生

为什么我只用了2个进程?

进程数不是无限的,用2个进程而不是5个或10个进程来测试我的例子很容易。

为什么我手动放置3个项目?例如,在实际的解决方案中,所有的项目都是动态添加的,所以没有办法在它们上面循环或使用map。

在我看来,也许您并不真的需要这里的pool,并且可以为每个任务创建一个新的Process。如果您想限制一次存在多少个任务,您可以使用Semaphore来限制进程的创建,并在每个任务完成之前释放该信号量:

from multiprocessing import Process, BoundedSemaphore
from time import sleep
def do_work(A, B):
sleep(.4)
print(A, B)
def worker(sema, *args):
try:
do_work(*args)
finally:
sema.release() #allow a new process to be started now that this one is exiting
def main():
tasks = zip(range(65,91), bytes(range(65,91)).decode())
sema = BoundedSemaphore(4) #only every 4 workers at a time
procs = []
for arglist in tasks:
sema.acquire() #wait to start until another process is finished
procs.append(Process(target=worker, args=(sema, *arglist)))
procs[-1].start()
#cleanup completed processes
while not procs[0].is_alive():
procs.pop(0)
for p in procs:
p.join() #wait for any remaining tasks
print("done")
if __name__ == "__main__":
main()

您使用池初始化项所做的事情是最不寻常的。这样的初始化器为每个池进程运行,并用于初始化该进程(例如,设置全局变量),以便它能够运行提交的任务。多处理池实现了一个隐藏任务队列,用于保存等待可用池进程处理的提交任务。您的初始化程序代码只能执行单个类任务(我保留术语任务用于在"正常的"代码中提交给处理池的工作)。方式),然后返回。也就是说,您将3个项目放在队列中,但只有2个池进程从队列中获取单个项目,处理它然后返回。这对我来说没有任何意义。

你的代码没有显示你的HTTP服务器和在你的多处理池中运行的任务之间的关系,我不会猜测那可能是什么。所以我将只展示使用池的更标准的方法。我已经删除了maxtasksperchild参数,因为它只在池执行"正常"时才相关。添加到任务队列的任务,例如,使用apply_asyncmap方法。因此,它在你的代码中没有完成任何事情。

from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from multiprocessing import Pool, current_process

def do_something(item_id):
worker_id = current_process().pid
print(f"Worker {worker_id} received id: {item_id}")
# long_term_operation_that_leaks_memory(item_id)
# print(f"Worker {worker_id} completed id: {item_id}")
def main():
# Why only 2 processes in the pool?:
pool = Pool(processes=2)
pool.apply_async(do_something, args=('a',))
pool.apply_async(do_something, args=('b',))
pool.apply_async(do_something, args=('c',))
server_address = ("", 8000)
httpd = ThreadingHTTPServer(server_address, BaseHTTPRequestHandler)
try:
httpd.serve_forever()
except (KeyboardInterrupt, SystemExit):
pass
# Wait for submitted tasks to complete:
pool.close()
pool.join()
if __name__ == "__main__":
main()

打印:

Worker 15560 received id: a
Worker 8132 received id: b
Worker 15560 received id: c

相关内容

  • 没有找到相关文章

最新更新