将Redis消息与工作线程池一起使用



我有一个Redis列表,发布者在其中推送一些消息(JSON序列化)。

另一方面,订阅者可以获取每个JSON blob并执行一些操作。最简单的方法是连续地执行此操作。但我想让它快一点;我想维护一个工作进程池(多个使用者),每当有新消息到达时,检查池中是否有可以开始处理的"空闲"进程我正在寻找以下的基于池的版本

while not False:
    _, new_user = conn.blpop('queue:users')
    if not new_user:
        continue
    try:
        process_new_user(new_user, conn)
    except Exception as e:
        print e
    else:
        pass

然而,我无法将其转换为使用Python多处理的代码。池类。文档对也没有帮助

from multiprocessing import Pool

pool = Pool()
while 1:
    _, new_user = conn.blpop('queue:users')
    if not new_user:
        continue
    pool.apply_async(process_new_user, args=(new_user, conn))

如果要处理异常,则需要收集pool.apply_async返回的AsyncResult对象并检查其状态。

如果您可以使用Python 3,concurrent.futures池允许在异步回调中处理结果,从而更容易检查作业退出状态。

相关内容

  • 没有找到相关文章

最新更新