Django Celery - 使用 RabbitMQ 将对象传递给视图和任务之间



这是我第一次使用芹菜,老实说,我不确定我做得对。我的系统必须在Windows上运行,所以我使用RabbitMQ作为代理。

作为概念证明,我正在尝试创建一个对象,其中一个任务设置值,另一个任务读取值,并且我还想在转到某个 url 时显示对象的当前值。但是,我在所有内容之间共享对象时遇到问题。

这是我的 celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE','cesGroundStation.settings')
app = Celery('cesGroundStation')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind = True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

我尝试共享的对象是:

class SchedulerQ():
item = 0
def setItem(self, item):
self.item = item
def getItem(self):
return self.item

这是我的 tasks.py

from celery import shared_task
from time import sleep
from scheduler.schedulerQueue import SchedulerQ
schedulerQ = SchedulerQ()
@shared_task()
def SchedulerThread():
print ("Starting Scheduler") 
counter = 0
while(1):
counter += 1
if(counter > 100):
counter = 0
schedulerQ.setItem(counter)
print("In Scheduler thread - " + str(counter))
sleep(2)
print("Exiting Scheduler")
@shared_task()
def RotatorsThread():
print ("Starting Rotators") 
while(1):
item = schedulerQ.getItem()
print("In Rotators thread - " + str(item))
sleep(2)
print("Exiting Rotators")
@shared_task()
def setSchedulerQ(schedulerQueue):
schedulerQ = schedulerQueue
@shared_task()
def getSchedulerQ():
return schedulerQ

我正在 apps.py 开始我的任务...我不确定这是否是正确的地方,因为任务/工作线程似乎不起作用,直到我在运行celery -A cesGroundStation -l info的单独控制台中启动工作线程。

from django.apps import AppConfig
from scheduler.schedulerQueue import SchedulerQ
from scheduler.tasks import SchedulerThread, RotatorsThread, setSchedulerQ, getSchedulerQ
class SchedulerConfig(AppConfig):
name = 'scheduler'
def ready(self):
schedulerQ = SchedulerQ()
setSchedulerQ.delay(schedulerQ)
SchedulerThread.delay()
RotatorsThread.delay()

在我的 views.py 中,我有这个:

def schedulerQ():
queue = getSchedulerQ.delay()
return HttpResponse("Your list: " + queue)

django 应用程序运行没有错误,但是我从"celery -A cesGroundStation -l info"的输出是这样的:Celery 命令输出

首先,它似乎启动了多个"SchedulerThread"任务,其次,"SchedulerQ"对象没有传递给Rotators,因为它没有读取更新的值。

如果我转到显示views.schedulerQ视图的url,则会出现此错误: 姜戈视图错误

我对Python,Django和Web开发的经验非常非常少,所以我不知道从哪里开始最后一个错误。解决方案建议使用 Redis 将对象传递给视图,但我不知道如何使用 RabbitMQ 做到这一点。稍后,调度器Q对象将实现一个队列,调度器和旋转器将更多地充当生产者/消费者动态,视图显示队列的内容,所以我相信使用数据库可能过于资源密集。如何在所有任务之间共享此对象,这是否是正确的方法?

正确的方法是使用一些持久性层(如数据库或结果后端)来存储要在任务之间共享的信息(在此示例中,您当前在类中放入的内容)。

Celery 在分布式消息传递范式上运行 - 为此示例提炼该想法的一个好方法是,每次调度任务时,您的模块都将独立执行。每当将任务分派给 Celery 时,您必须假设它在单独的解释器中运行并独立于其他任务加载。该SchedulerQ类每次都会重新实例化。

您可以按照前面链接的文档中所述的方式在任务之间共享信息,一些最佳实践提示讨论了数据持久性问题。

最新更新