我正在构建一个机架中间件,该中间件订阅Redis通道并使用服务器发送事件将消息推送到客户端。Sinatra为此提供了一个不错的DSL。我有一个工作示例,但是,我遇到的问题是,一旦我有7或8个客户端,性能就会大幅下降。在尝试在请求之间重用Redis连接时,我也遇到了"死锁"服务器的问题。
我正在使用Thin为应用程序提供服务(它在后台使用EventMachine)。我认为Sinatra DSL已经处理了EventMachine的并发,但也许这是我需要自己实现的东西?我不想限制自己只使用基于EventMachine的服务器(Thin,Rainbows!),以防有人想使用Puma这样的多线程服务器。我应该做些什么来增加代码中的并发性?
require 'redis'
require 'sinatra/base'
class SSE < Sinatra::Base
def send_message(json)
"id: #{Time.now}n" +
"data: #{json}" +
"rnn"
end
get '/channels/:id/subscribe', provides: 'text/event-stream' do
channel_id = params['id']
stream(:keep_open) do |connection|
Redis.new.subscribe("channels:#{channel_id}") do |on|
on.message do |channel, json|
connection << send_message(json)
end
end
end
end
end
我会想到一些事情,所以我将不按偏序迭代它们。
我正在使用Thin为应用程序提供服务(该应用程序使用发动机罩)。我认为Sinatra DSL已经处理了并发使用EventMachine,但也许这是我需要的执行我自己?
你是对的,Thin使用EventMachine。然而,EventMachine(或任何其他反应器)的问题是,一旦执行同步操作,就会停止整个反应器。因此,要真正获得您期望的并发性,您需要在整个应用程序中继续使用EventMachine。
为支持pub/sub的EventMachine就绪Redis客户端签出em hiredis。
我不想限制自己只能使用基于EventMachine的服务器(瘦,彩虹!)以防有人想使用多线程服务器像彪马
从未尝试过我要说的内容,但我认为在没有尝试的服务器中使用EventMachine不会有问题。只需记住启动您自己的EM。也许在config.ru中?
我在尝试时也遇到了"死锁"服务器的问题在请求之间重用Redis连接
我相信你遇到这种情况的原因是,每次调用"/channes/:id/subscribe"都会打开与Redis的新连接。你只能打开这么多。考虑将Redis.new
重构为应用程序的共享连接。只打开一次。一个Redis连接应该能够处理多个pub/subs。
只是一些想法,我希望它们能有所帮助。
经过大量的研究和实验,以下是我在sinatra+sinatra sse gem:中使用的代码
class EventServer < Sinatra::Base
include Sinatra::SSE
set :connections, []
.
.
.
get '/channel/:channel' do
.
.
.
sse_stream do |out|
settings.connections << out
out.callback {
puts 'Client disconnected from sse';
settings.connections.delete(out);
}
redis.subscribe(channel) do |on|
on.subscribe do |channel, subscriptions|
puts "Subscribed to redis ##{channel}n"
end
on.message do |channel, message|
puts "Message from redis ##{channel}: #{message}n"
message = JSON.parse(message)
.
.
.
if settings.connections.include?(out)
out.push(message)
else
puts 'closing orphaned redis connection'
redis.unsubscribe
end
end
end
end
end
redis连接阻塞on.message,只接受(p)subscribe/(p)unsubscribe命令。一旦取消订阅,redis连接就不再被阻止,并且可以由初始sse请求实例化的web服务器对象释放。当您在redis上收到消息时,它会自动清除,并且集合数组中不再存在与浏览器的sse连接。