如何在django应用程序中用azure服务总线替换celene任务



我被要求在Django应用程序中使用azure服务总线,而不是芹菜。

阅读了提供的文档,但没有清楚地了解使用服务总线而不是芹菜任务。提供的任何建议都将大有帮助。

在开始之前,我想强调Azure Service Bus和Celery之间的区别。

Azure服务总线:

Microsoft Azure Service Bus是一个完全管理的企业集成消息代理。

你可以参考这个来了解更多关于服务总线的信息

芹菜:

分布式任务队列。Celery是一个基于分布式消息传递的异步任务队列/作业队列。

在你的情况下,我可以想到两种可能性:

  1. 您想使用带有Celery的服务总线来代替其他消息代理
  2. 用服务总线替换Celery

1:您希望使用带有Celery的服务总线来代替其他消息代理

您可以参考这一点来理解为什么芹菜需要一个消息代理。我不确定你目前使用的是哪一个消息代理,但你可以使用Kombu库来满足你的要求。

Azure服务总线参考:https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.transport.azureservicebus.html

他人参考:https://docs.celeryproject.org/projects/kombu/en/stable/reference/index.html

2:用服务总线完全替换Celery满足您的要求:

考虑

  • 消息发送者是生产者
  • 消息接收者是消费者

这是你必须处理的两个不同的应用程序。

您可以参考以下内容来获得更多的示例代码。

https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples

解释:

  • 每次您想执行操作时,都可以发送从生产者客户端发送到主题的消息
  • Consumer Client(正在侦听的应用程序(将接收消息并进行处理。您可以将您的自定义流程附加到它上——这样,每当在消费者客户端收到消息时,您的自定义进程就会被执行

以下是接收客户端的示例:

from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()

Receiving = True
#Topic 1 receiver : 
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver =  SubsClient.get_receiver()
async def receive_message_from1():
await receiver.open()
print("Opening the Receiver for Topic1")
async with receiver:
while(Receiving):
msgs =  await receiver.fetch_next()
for m in msgs:
print("Received the message from topic 1.....")
##### - Your code to execute when a message is received - ########
print(str(m))
##### - Your code to execute when a message is received - ########
await m.complete()


loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())

下一行之间的部分将是每次接收到消息时将执行的指令。

##### - Your code to execute when a message is received - ########

相关内容

  • 没有找到相关文章

最新更新