我有非常基本的生产者-消费者代码,用python中的pika框架编写。问题是 - 消费者端在队列中的消息上运行得太慢。我运行了一些测试,发现我可以通过多处理将工作流程加快多达 27 倍。问题是 - 我不知道向我的代码添加多处理功能的正确方法是什么。
import pika
import json
from datetime import datetime
from functions import download_xmls
def callback(ch, method, properties, body):
print('Got something')
body = json.loads(body)
type = body[-1]['Type']
print('Object type in work currently ' + type)
cnums = [x['cadnum'] for x in body[:-1]]
print('Got {} cnums to work with'.format(len(cnums)))
date_start = datetime.now()
download_xmls(type,cnums)
date_end = datetime.now()
ch.basic_ack(delivery_tag=method.delivery_tag)
print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))
def consume(queue_name = 'bot-test'):
parameters = pika.URLParameters('server@address')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='bot-test')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
如何从此处开始添加多处理功能?
Pika有广泛的示例代码,我建议您查看。请注意,此代码仅用于示例。在线程上工作的情况下,您将不得不使用更智能的方式来管理线程。
目标是不阻塞运行 Pika IO 循环的线程,并从工作线程正确回调 IO 循环。这就是add_callback_threadsafe
存在并在该代码中使用的原因。
注意:RabbitMQ 团队监控rabbitmq-users
邮件列表,并且只偶尔回答 StackOverflow 上的问题。
import pika
import json
from multiprocessing import Process
from datetime import datetime
from functions import download_xmls
import multiprocessing
import concurrent.futures
def do_job(body):
body = json.loads(body)
type = body[-1]['Type']
print('Object type in work currently ' + type)
cnums = [x['cadnum'] for x in body[:-1]]
print('Got {} cnums to work with'.format(len(cnums)))
date_start = datetime.now()
download_xmls(type,cnums)
date_end = datetime.now()
ch.basic_ack(delivery_tag=method.delivery_tag)
print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))
def callback(ch, method, properties, body):
print('Got something')
p = Process(target=do_job,args=(body))
p.start()
p.join()
def consume(queue_name = 'bot-test'):
parameters = pika.URLParameters('server@address')
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='bot-test')
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
def get_workers():
try:
return multiprocessing.cpu_count()
except NotImplementedError:
return 4
workers = get_workers()
with concurrent.futures.ProcessPoolExecutor() as executor:
for i in range(workers):
executor.submit(consume)
以上只是简单的演示,如何在此处包含多处理执行。我建议您浏览文档以进一步优化代码并实现所需的内容。
https://docs.python.org/3/library/multiprocessing.html#the-process-class