我有一个Django应用程序,它应该不断地监听来自Kafka的消息,然后通过WebSocket将它们发送到客户端。问题是如何设置常量侦听器。为了将来的可扩展性,我们决定在项目中引入Celery来管理这些可扩展性问题。
我的任务实际上看起来像:
class ConsumerTask(Task):
name = 'consume_messages'
def run(self, *args, **kwargs):
consumer = get_kafka_consumer(settings.KAFKA_URL,
settings.FAULT_MESSAGES_KAFKA_TOPIC,
'consumer_messages_group')
logger.info("Kafka's consumer has been started")
while True:
messages = consumer.poll()
for _, messages in messages.items():
messages, messages_count = self.get_message(messages)
if messages_count > 0:
messages = save_to_db()
send_via_websocket_messages(messages)
它通过WS正确地保存和发送消息,但问题来自于任务中的无限循环。由于某种原因(可能是任务超时限制(,若队列中出现任务,则该任务将不再运行。我不确定守护芹菜工人是否能解决这个问题。你能提供一些策略来组织这个过程的"持续运行部分"吗?
您的用例不适合芹菜任务。Celery任务不应该是长时间运行的进程。您需要将任务放入代理队列中,这在您的设置中也没有意义。
把你的while True循环想象成一个芹菜工人。工作人员应该不断地运行,这也是处理更多任务时需要扩大规模的过程。
编写一个Django管理命令,该命令使用while True循环,并使用您将用于扩展celener工人的那种扩展来运行该managemement命令的多个实例。
使用流程管理工具来扩展流程,如honcho或supervisord。