听mqtt关于django频道和芹菜的话题



我想要一种将django与mqtt集成的方法,为此,我首先想到的是使用django通道和通过web套接字支持mqtt的mqtt代理,这样我就可以直接在代理和django信道之间进行通信。

然而,我没有找到从django启动websocket客户端的方法,并且由于这个链接,这是不可能的。

由于我也开始研究任务队列,我想知道使用paho mqtt启动mqtt客户端,然后使用celey在单独的进程中运行它是否是一个好的做法。然后,这个过程会通过websockets将代理接收到的消息转发到django通道,这样我也可以与客户端过程通信,在需要时发布数据或停止mqtt客户端,所有这些都直接从django进行。

我对这个想法有点怀疑,因为我也读到在芹菜中运行的过程不应该花太长时间来完成,在这种情况下,这正是我想要做的

所以我的问题是,这有多糟糕?有没有其他选项可以直接将django与mqtt集成?

*注意:我不想在服务器上运行单独的进程,我希望能够从django启动和停止进程,以便从web gui完全控制mqtt客户端

我找到了一种不需要使用芹菜的更好方法。

我只是在app/apps.py上用ready方法启动了一个mqtt客户端,所以每次运行应用程序时都会启动一个客户端。在这里,我可以使用django通道或信号与系统的其他部分进行通信。

apps.py:

from django.apps import AppConfig
from threading import Thread
import paho.mqtt.client as mqtt

class MqttClient(Thread):
def __init__(self, broker, port, timeout, topics):
super(MqttClient, self).__init__()
self.client = mqtt.Client()
self.broker = broker
self.port = port
self.timeout = timeout
self.topics = topics
self.total_messages = 0
#  run method override from Thread class
def run(self):
self.connect_to_broker()
def connect_to_broker(self):
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.connect(self.broker, self.port, self.timeout)
self.client.loop_forever()
# The callback for when a PUBLISH message is received from the server.
def on_message(self, client, userdata, msg):
self.total_messages = self.total_messages + 1
print(str(msg.payload) + "Total: {}".format(self.total_messages))
# The callback for when the client receives a CONNACK response from the server.
def on_connect(self, client, userdata, flags, rc):
#  Subscribe to a list of topics using a lock to guarantee that a topic is only subscribed once
for topic in self.topics:
client.subscribe(topic)

class CoreConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'core'
def ready(self):
MqttClient("192.168.0.165", 1883, 60, ["teste/01"]).start()

如果在Django应用程序中使用ASGI,则可以使用MQTTAsgi。全面披露我是MQTTAsgi的作者。

它是Django和MQTT的完整协议服务器。

要使用可以运行应用程序的mqtt协议服务器,首先需要创建一个mqtt使用者:

from mqttasgi.consumers import MqttConsumer
class MyMqttConsumer(MqttConsumer):
async def connect(self):
await self.subscribe('my/testing/topic', 2)
async def receive(self, mqtt_message):
print('Received a message at topic:', mqtt_mesage['topic'])
print('With payload', mqtt_message['payload'])
print('And QOS:', mqtt_message['qos'])
pass
async def disconnect(self):
await self.unsubscribe('my/testing/topic')

然后您应该将此协议添加到协议路由器:

application = ProtocolTypeRouter({
'websocket': AllowedHostsOriginValidator(URLRouter([
url('.*', WebsocketConsumer)
])),
'mqtt': MyMqttConsumer,
....
})

然后,您可以使用*:运行mqtt协议服务器

mqttasgi -H localhost -p 1883 my_application.asgi:application

*假设代理位于localhost和端口1883。

我也想解决这个问题,但没有找到真正适合Channels体系结构的好解决方案(尽管MQTTAsgi很接近,但它使用paho mqtt,并且没有完全使用Channels层系统(。

我创建了:https://pypi.org/project/chanmqttproxy/

(src位于https://github.com/lbt/channels-mqtt-proxy)

本质上,它是MQTT的完全异步通道3代理,允许发布和订阅。该文档展示了如何扩展标准Channels教程,以便在MQTT主题上看到聊天消息,并且可以从MQTT主题向所有websocket浏览器客户端发送聊天消息。

就监听MQTT主题而言,我不知道这是OP想要的,但对于一般情况,我认为这是一个很好的解决方案。

最新更新