我正在用ruby实现一个小型的thrift(0.6.0)服务器,它扮演另一个协议的代理角色,该协议具有多个连接(多个客户端)到单个服务器。我希望能够在服务器端保存每个客户端的数据,并在多次调用处理程序函数时跟踪"会话"参数。
我目前使用Thrift::NonblockingServer
,因为SimpleServer
似乎不允许并发连接。
我知道如何使用TCPSocket::peeraddr
,但Thrift::NonblockingServer::IOManager::Worker::run
创建了一个临时的MemoryBufferTransport
与它读取的帧,并将其作为输入/输出协议传递给处理器,因此似乎信息没有从那里传递下来。
是否有一个干净的方法来做到这一点?我正在考虑重新定义上面提到的Thrift::NonblockingServer::IOManager::Worker::run
,也包括fd,或其他ID在一个额外的参数来处理或增加原型实例,但我也不得不担心一层生成的ruby代码(class Processor
中的process_*
方法),它似乎有点重。
我想知道以前是否有人做过这样的事。
谢谢!
注。这个问题类似于c++中的thrift问题
我是这样修改Thrift::NonblockingServer::IOManager::Worker::run
来支持这个的
几个事项:
- 正如我在问题中提到的,我不认为它是干净的(如果没有别的,我将不得不监控未来的thrift版本在这个函数中的变化,这是基于0.6.0)。
- 这是为ruby 1.8编写的(否则我会得到未翻译的addr/port,请参阅我的其他问题) 我是一个ruby新手…我敢肯定我已经做了一些这样的"错误"(例如,
- 我知道
Thread.current
哈希线程本地存储有一个命名空间污染问题
$connections
应该是ConnEntry
或diff类中的@@connections
吗?)。首先,在某个中心模块:
module MyThriftExt
$connections={}
class ConnEntry
attr_reader :addr_info, :attr
def initialize(thrift_fd)
@addr_info=thrift_fd.handle.peeraddr
@attr={}
end
def self.PreHandle(fd)
$connections[fd]=ConnEntry.new(fd) unless $connections[fd].is_a? ConnEntry
# make the connection entry as short-term thread-local variable
# (cleared in postHandle)
Thread.current[:connEntry]=$connections[fd]
end
def self.PostHandle()
Thread.current[:connEntry]=nil
end
def to_s()
"#{addr_info}"
end end end
module Thrift class NonblockingServer
class IOManager
alias :old_remove_connection :remove_connection
def remove_connection(fd)
$connections.delete fd
old_remove_connection(fd)
end
class Worker
# The following is verbatim from thrift 0.6.0 except for the two lines
# marked with "Added"
def run
loop do
cmd, *args = @queue.pop
case cmd
when :shutdown
@logger.debug "#{self} is shutting down, goodbye"
break
when :frame
fd, frame = args
begin
otrans = @transport_factory.get_transport(fd)
oprot = @protocol_factory.get_protocol(otrans)
membuf = MemoryBufferTransport.new(frame)
itrans = @transport_factory.get_transport(membuf)
iprot = @protocol_factory.get_protocol(itrans)
MyThriftExt::ConnEntry.PreHandle(fd) # <<== Added
@processor.process(iprot, oprot)
MyThriftExt::ConnEntry.PostHandle # <<== Added
rescue => e
@logger.error "#{Thread.current.inspect} raised error: #{e.inspect}n#{e.backtrace.join("n")}"
end
end
end
end
end
end
end
end
然后在处理程序中的任何点,您可以访问Thread.current[:connEntry].addr_info
以获取连接特定数据或在Thread.current[:connEntry].attr
哈希中存储有关连接的任何内容。