我想定义一个n个工人的池,并让每个工人执行在rabbitmq队列中持有的任务。当这个任务完成(失败或成功)时,我希望worker从队列中执行另一个任务。
我可以在文档中看到如何生成一个工人池,并让他们都等待他们的兄弟姐妹完成。我想要一些不同的东西,虽然:我想有n个任务的缓冲区,当一个工人完成它添加另一个任务到缓冲区(所以不超过n个任务在bugger)。我很难在文档中搜索到这个。
对于上下文,我的非多线程代码如下:
while True:
message = get_frame_from_queue() # get message from rabbit mq
do_task(message.body) #body defines urls to download file
acknowledge_complete(message) # tell rabbitmq the message is acknowledged
在这个阶段,我的"多线程"实现看起来像这样:
@recieves('ask_for_a_job')
def get_a_task():
# this function is executed when `ask_for_a_job` signal is fired
message = get_frame_from_queue()
do_task(message)
def do_tasks(task_info):
try:
# do stuff
finally:
# once the "worker" has finished start another.
fire_fignal('ask_for_a_job')
# start the "workers"
for i in range(5):
fire_fignal('ask_for_a_job')
我不想重新发明轮子。是否有一种更内置的方式来实现这一点?
get_frame_from_queue
不是线程安全的。您应该能够让每个子进程/线程直接从队列中消费,然后在每个线程中,简单地从队列中处理,就像您同步处理一样。
from threading import Thread
def do_task(msg):
# Do stuff here
def consume():
while True:
message = get_frame_from_queue()
do_task(message.body)
acknowledge_complete(message)
if __name __ == "__main__":
threads = []
for i in range(5):
t = Thread(target=consume)
t.start()
threads.append(t)
这样,您将始终同时处理来自队列的N条消息,而不需要在线程之间发生信令。
唯一的"问题"是你使用的rabbitmq库的线程安全性。根据它的实现方式,您可能需要每个线程单独连接,或者每个线程一个通道连接,等等。
一个解决方案是利用multiprocessing.Pool
对象。使用外部循环从RabbitMQ获取N项。将项目输入到池中,直到整个批处理完成。然后循环处理批处理,确认每条消息。最后继续外循环。
import multiprocessing
def worker(word):
return bool(word=='whiskey')
messages = ['syrup', 'whiskey', 'bitters']
BATCHSIZE = 2
pool = multiprocessing.Pool(BATCHSIZE)
while messages:
# take first few messages, one per worker
batch,messages = messages[:BATCHSIZE],messages[BATCHSIZE:]
print 'BATCH:',
for res in pool.imap_unordered(worker, batch):
print res,
print
# TODO: acknowledge msgs in 'batch'
输出BATCH: False True
BATCH: False
import multiprocessing
def worker(word):
return bool(word=='whiskey')
messages = ['syrup', 'whiskey', 'bitters']
BATCHSIZE = 2
pool = multiprocessing.Pool(BATCHSIZE)
while messages:
# take first few messages, one per worker
batch,messages = messages[:BATCHSIZE],messages[BATCHSIZE:]
print 'BATCH:',
for res in pool.imap_unordered(worker, batch):
print res,
print
# TODO: acknowledge msgs in 'batch'
BATCH: False True
BATCH: False