我想要达到的效果
我有一个外部生产者,它将消息发送到RabbitMQ实例到某个Queue。我想实现一个芹菜消费者,它监听这些队列,并在RabbitMQ收到这些队列的消息后立即启动一个任务。
到目前为止我做了什么
我确信RabbitMQ实际接收消息并将它们放入正确的Queue中。我还确保设置了到代理的连接。(我可以执行芹菜任务)
然而,当我试图从文档中实现示例时,预期的日志没有显示。
下面是示例:
from my_proj.celery import app
from celery import bootsteps
class InfoStep(bootsteps.Step):
def __init__(self, parent, **kwargs):
# here we can prepare the Worker/Consumer object
# in any way we want, set attribute defaults, and so on.
print('{0!r} is in init'.format(parent))
def start(self, parent):
# our step is started together with all other Worker/Consumer
# bootsteps.
print('{0!r} is starting'.format(parent))
def stop(self, parent):
# the Consumer calls stop every time the consumer is
# restarted (i.e., connection is lost) and also at shutdown.
# The Worker will call stop at shutdown only.
print('{0!r} is stopping'.format(parent))
def shutdown(self, parent):
# shutdown is called by the Consumer at shutdown, it's not
# called by Worker.
print('{0!r} is shutting down'.format(parent))
app.steps['worker'].add(InfoStep)
app.steps['consumer'].add(InfoStep)
我用celery --app=my_proj.celery:app worker
启动worker。
我错过了什么?我对bootsteps的理解错了吗?
# #编辑:
我尝试了另一种方法与bootsteps.ConsumerStep
像这样:
my_queue = Queue(
"my_queue",
Exchange("my_exchange"),
)
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [
Consumer(
channel,
queues=[my_queue],
callbacks=[self.handle_message],
accept=["json"],
)
]
def handle_message(self, body, message):
print("Received message: {0!r}".format(body))
message.ack()
app.steps["consumer"].add(MyConsumerStep)
我仍然遇到消息未被消费或确认的问题。我发现我可以通过调用celery --app=my_proj.celery:app controll add_consumer my_queue
来添加一个消费者,它实际上似乎消耗了消息,但我想在没有命令行命令的情况下启动消费者,具体调用队列。
另外,虽然消息被消费了,但它们不是由我编写的Consumer处理的。相反,我得到以下错误:
[2022-05-31 12:33:44,720: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!?
The full contents of the message body was: (some details to the message)
我有一个类似的设置,但我已经将队列定义为消费者类的属性。我不知道这有什么区别。
class MyConsumerStep(bootsteps.ConsumerStep):
my_queue = Queue(
"my_queue",
Exchange("my_exchange", type='topic'),
)
def get_consumers(self, channel):
return [
Consumer(
channel,
queues=[self.my_queue],
callbacks=[self.on_message],
accept=["json"],
)
]
def on_message(self, body, message):
print("Received message: {0!r}".format(body))
message.ack()
app.steps["consumer"].add(MyConsumerStep)