通过 websocket 发送 RabbitMQ 消息



寻找一些代码示例来解决这个问题:-

想编写一些代码(Python 或 Javascript)作为 RabbitMQ 队列的订阅者,以便在收到消息时通过 websocket 将消息广播到任何连接的客户端。

我已经查看了高速公路和node.js(使用"amqp"和"ws"),但无法根据需要工作。这是使用node.js的javascript中的服务器代码:-

var amqp = require('amqp');
var WebSocketServer = require('ws').Server
var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});
wss.on('connection',function(ws){
    ws.on('open', function() {
        console.log('connected');
        ws.send(Date.now().toString());
    });
    ws.on('message',function(message){
            console.log('Received: %s',message);
            ws.send(Date.now().toString());
    });
});
connection.on('ready', function(){
    connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
            console.log(' [*] Waiting for messages. To exit press CTRL+C')
            queue.subscribe(function(msg){
                    console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
                    payload = msg.data.toString('utf-8');
                    // HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
            });
    });
});

使用此代码,我可以成功订阅 Rabbit 中的队列并接收发送到队列的任何消息。同样,我可以将 websocket 客户端(例如浏览器)连接到服务器并发送/接收消息。但。。。如何在指定的点将 Rabbit 队列消息的有效负载作为 websocket 消息发送("现在如何通过 WEBSOCKETS 发送")?我认为这与卡在错误的回调中有关,或者它们需要以某种方式嵌套......?

或者,如果这可以在Python中更容易完成(通过Autobahn和pika),那就太好了。

谢谢!

实现系统的一种方法是将python与Tornado一起使用。

这里是服务器:

    import tornado.ioloop
    import tornado.web
    import tornado.websocket
    import os
    import pika
    from threading import Thread

    clients = []
    def threaded_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"));
        print 'Connected:localhost'
        channel = connection.channel()
        channel.queue_declare(queue="my_queue")
        print 'Consumer ready, on my_queue'
        channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) 
        channel.start_consuming()

    def consumer_callback(ch, method, properties, body):
            print " [x] Received %r" % (body,)
            for itm in clients:
                itm.write_message(body)
    class SocketHandler(tornado.websocket.WebSocketHandler):
        def open(self):
            print "WebSocket opened"
            clients.append(self)
        def on_message(self, message):
            self.write_message(u"You said: " + message)
        def on_close(self):
            print "WebSocket closed"
            clients.remove(self)

    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            print "get page"
            self.render("websocket.html")

application = tornado.web.Application([
    (r'/ws', SocketHandler),
    (r"/", MainHandler),
])
if __name__ == "__main__":
    thread = Thread(target = threaded_rmq)
    thread.start()
    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

这里是 HTML 页面:

<html>
<head>
    <script src="//code.jquery.com/jquery-1.11.0.min.js"></script>
    <script>
    $(document).ready(function() {
      var ws;
       if ('WebSocket' in window) {
           ws = new WebSocket('ws://localhost:8889/ws');
        }
        else if ('MozWebSocket' in window) {
            ws = new MozWebSocket('ws://localhost:8889/ws');
        }
        else {
              alert("<tr><td> your browser doesn't support web socket </td></tr>");
            return;
        }
      ws.onopen = function(evt) { alert("Connection open ...")};
      ws.onmessage = function(evt){
        alert(evt.data);
      };
      function closeConnect(){
          ws.close();
      }

  });
    </script>
</head>
<html>

因此,当您将消息发布到"my_queue"时,该消息将重定向到所有连接的网页。

我希望它能有用

编辑**

在这里 https://github.com/Gsantomaggio/rabbitmqexample 您可以找到完整的示例

相关内容

  • 没有找到相关文章

最新更新