我正在使用EventMachine从Hornetq主题中读取,将其推向EM WebSocket Connections订阅的通道。我需要防止 @topic.receive循环阻止,因此创建了一个proc并正在呼叫EventMachine.defer,而没有回调。这将无限期运行。这很好。我也可以刚刚使用的线程。
我的问题是,这是从流/队列读取并将数据传递到频道的正确方法吗?是否有更好的/其他任何方法?
require 'em-websocket'
require 'torquebox-messaging'
class WebsocketServer
def initialize
@channel = EM::Channel.new
@topic = TorqueBox::Messaging::Topic.new('/topics/mytopic')
end
def start
EventMachine.run do
topic_to_channel = proc do
while true
msg = @topic.receive
@channel.push msg
end
end
EventMachine.defer(topic_to_channel)
EventMachine::WebSocket.start(:host => "127.0.0.1", :port => 8081, :debug => false) do |connection|
connection.onopen do
sid = @channel.subscribe { |msg| connection.send msg }
connection.onclose do
@channel.unsubscribe(sid)
end
end
end
end
end
end
WebsocketServer.new.start
这没关系,但是EM.Defer将产生20个线程,因此我会为您的用例避免使用它。通常,我将完全避免EM,尤其是Java反应堆,因为我们从未完成。
Torquebox在Websockets解决方案上具有本机踩踏解决方案,这将是在这种情况下进行的更好方法,并为您解决了许多其他封装挑战。
如果您真的想坚持使用EM,那么我会使用thread.new而不是延期,以避免出于无缘无故地占用额外的RAM。