在节俭处理程序函数中获取对等地址



我正在用ruby实现一个小型的thrift(0.6.0)服务器,它扮演另一个协议的代理角色,该协议具有多个连接(多个客户端)到单个服务器。我希望能够在服务器端保存每个客户端的数据,并在多次调用处理程序函数时跟踪"会话"参数。

我目前使用Thrift::NonblockingServer,因为SimpleServer似乎不允许并发连接。

我知道如何使用TCPSocket::peeraddr,但Thrift::NonblockingServer::IOManager::Worker::run创建了一个临时的MemoryBufferTransport与它读取的帧,并将其作为输入/输出协议传递给处理器,因此似乎信息没有从那里传递下来。

是否有一个干净的方法来做到这一点?我正在考虑重新定义上面提到的Thrift::NonblockingServer::IOManager::Worker::run,也包括fd,或其他ID在一个额外的参数来处理或增加原型实例,但我也不得不担心一层生成的ruby代码(class Processor中的process_*方法),它似乎有点重。

我想知道以前是否有人做过这样的事。

谢谢!

注。这个问题类似于c++中的thrift问题

我是这样修改Thrift::NonblockingServer::IOManager::Worker::run来支持这个的

几个事项:

  • 正如我在问题中提到的,我不认为它是干净的(如果没有别的,我将不得不监控未来的thrift版本在这个函数中的变化,这是基于0.6.0)。
  • 这是为ruby 1.8编写的(否则我会得到未翻译的addr/port,请参阅我的其他问题)
  • 我是一个ruby新手…我敢肯定我已经做了一些这样的"错误"(例如,$connections应该是ConnEntry或diff类中的@@connections吗?)。
  • 我知道Thread.current哈希线程本地存储有一个命名空间污染问题

首先,在某个中心模块:

module MyThriftExt
  $connections={}
  class ConnEntry
    attr_reader :addr_info, :attr
    def initialize(thrift_fd)
      @addr_info=thrift_fd.handle.peeraddr
      @attr={}
    end
    def self.PreHandle(fd)
      $connections[fd]=ConnEntry.new(fd) unless $connections[fd].is_a? ConnEntry
      # make the connection entry as short-term thread-local variable 
      # (cleared in postHandle)
      Thread.current[:connEntry]=$connections[fd]
    end
    def self.PostHandle()
      Thread.current[:connEntry]=nil
    end
    def to_s()
      "#{addr_info}"
    end   end end

module Thrift   class NonblockingServer
    class IOManager
      alias :old_remove_connection :remove_connection
      def remove_connection(fd)
        $connections.delete fd
        old_remove_connection(fd)
      end
      class Worker
        # The following is verbatim from thrift 0.6.0 except for the two lines 
        # marked with "Added"
        def run
          loop do
            cmd, *args = @queue.pop
            case cmd
            when :shutdown
              @logger.debug "#{self} is shutting down, goodbye"
              break
            when :frame
              fd, frame = args
              begin
                otrans = @transport_factory.get_transport(fd)
                oprot = @protocol_factory.get_protocol(otrans)
                membuf = MemoryBufferTransport.new(frame)
                itrans = @transport_factory.get_transport(membuf)
                iprot = @protocol_factory.get_protocol(itrans)
                MyThriftExt::ConnEntry.PreHandle(fd)    # <<== Added
                @processor.process(iprot, oprot)
                MyThriftExt::ConnEntry.PostHandle       # <<== Added
              rescue => e
                @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}n#{e.backtrace.join("n")}"
              end
            end
          end
        end
      end
    end   
  end
end

然后在处理程序中的任何点,您可以访问Thread.current[:connEntry].addr_info以获取连接特定数据或在Thread.current[:connEntry].attr哈希中存储有关连接的任何内容。

最新更新