芹菜队列同时获取多个消息



我正在研究一个基于docker的celry支持的Python应用程序,其中一个功能是触发并发送给定数字的文本消息。工作流程如下:

  1. 用户上传一个CSV文件,其中包含一组文本信息应该发送给的条目
  2. cron作业每60秒轮询数据库以获取任何新条目并将它们添加到队列
  3. 如果发现新条目,将它们放入队列并触发文本消息

目前,如果我上传一个有3个条目的CSV文件,每个文本消息动作都是顺序触发的,而不是并行触发的(默认的芹菜进程bheavy)。例如,如果调度器每隔10秒从队列中取出一个作业,则发送3条文本消息所需的时间将是30秒。由于这些作业是相互独立的,所以我想并行化它们,以便同时发送所有三个文本消息。

我尝试增加队列的并发性,但假设每个线程将被分配三个消息之一,但它不起作用。恐怕我遗漏了什么东西。我是否需要添加一些其他配置,以便我可以并行化作业?

命令运行芹菜队列

celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair

芹菜配置


app = Celery(
'worker',
broker=os.environ['CELERY_BROKER'],
backend=os.environ['RABBITMQ_BACKEND'],
include=['worker.tasks','worker.schedule']
)

app.conf.update(
result_expires=3600,
task_track_started=True,
worker_prefetch_multiplier = 5
)
app.conf.beat_schedule = {
"get-message": {
"task": "worker.schedule.get_new_messages",
"schedule": 10,
'options': {'queue' : 'queue1'}
}
}

您可以使用芹菜中的组

组用于并行执行任务。group函数接受签名列表

可能对参考有帮助,请看下面的文档

https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/

修复此问题的最佳方法是修复cron作业。在cron作业中,不是将消息放入队列,而是调用芹菜任务。当前按顺序处理消息的原因取决于您对worker.schedule.get_new_messages的实现。最有可能的是,该函数一次从队列中取出多个消息,而处理这些消息的函数一次只做一个消息。

解决这个问题的方法是创建一个只发送一条消息的任务。例如:

@app.task('send_my_cool_message')
def send_sms_message(from_, to_, text):
twilio_client = Client(settings.ACCOUNT_SID, settings.AUTH_TOKEN)
twilio_client.messages.create(to=to_, from=from_, body=text)

现在,在您的cron作业中,您为每个消息调用一个芹菜任务:

from celery import Celery
qs = Messages.objects.filter(created_at__gte=last_date_polled)
app = Celery(broker=settings.BROKER_URL, backend=settings.BACKEND_URL)
for message in qs:
app.send_task('send_my_cool_message', kwargs={
'from_': message.from_,
'to_': message.to_,
'text': message.text,
})

最新更新