我被要求在Django应用程序中使用azure服务总线,而不是芹菜。
阅读了提供的文档,但没有清楚地了解使用服务总线而不是芹菜任务。提供的任何建议都将大有帮助。
在开始之前,我想强调Azure Service Bus和Celery之间的区别。
Azure服务总线:
Microsoft Azure Service Bus是一个完全管理的企业集成消息代理。
你可以参考这个来了解更多关于服务总线的信息
芹菜:
分布式任务队列。Celery是一个基于分布式消息传递的异步任务队列/作业队列。
在你的情况下,我可以想到两种可能性:
- 您想使用带有Celery的服务总线来代替其他消息代理
- 用服务总线替换Celery
1:您希望使用带有Celery的服务总线来代替其他消息代理
您可以参考这一点来理解为什么芹菜需要一个消息代理。我不确定你目前使用的是哪一个消息代理,但你可以使用Kombu库来满足你的要求。
Azure服务总线参考:https://docs.celeryproject.org/projects/kombu/en/stable/reference/kombu.transport.azureservicebus.html
他人参考:https://docs.celeryproject.org/projects/kombu/en/stable/reference/index.html
2:用服务总线完全替换Celery满足您的要求:
考虑
- 消息发送者是生产者
- 消息接收者是消费者
这是你必须处理的两个不同的应用程序。
您可以参考以下内容来获得更多的示例代码。
https://github.com/Azure/azure-sdk-for-python/tree/master/sdk/servicebus/azure-servicebus/samples
解释:
- 每次您想执行操作时,都可以发送从生产者客户端发送到主题的消息
- Consumer Client(正在侦听的应用程序(将接收消息并进行处理。您可以将您的自定义流程附加到它上——这样,每当在消费者客户端收到消息时,您的自定义进程就会被执行
以下是接收客户端的示例:
from azure.servicebus.aio import SubscriptionClient
import asyncio
import nest_asyncio
nest_asyncio.apply()
Receiving = True
#Topic 1 receiver :
conn_str= "<>"
name="Allmessages1"
SubsClient = SubscriptionClient.from_connection_string(conn_str, name)
receiver = SubsClient.get_receiver()
async def receive_message_from1():
await receiver.open()
print("Opening the Receiver for Topic1")
async with receiver:
while(Receiving):
msgs = await receiver.fetch_next()
for m in msgs:
print("Received the message from topic 1.....")
##### - Your code to execute when a message is received - ########
print(str(m))
##### - Your code to execute when a message is received - ########
await m.complete()
loop = asyncio.get_event_loop()
topic1receiver = loop.create_task(receive_message_from1())
下一行之间的部分将是每次接收到消息时将执行的指令。
##### - Your code to execute when a message is received - ########