将消息发送回特定的Web Socket



下面是我使用HTML5 Web套接字的代码:

服务器代码:

require 'em-websocket'
class WebSocketsServer
  def self.instance
    @inst ||= self.new
  end
  def initialize
    @messages = Queue.new
    Thread.new do
      puts 'Initializing WebSocketsServer'
      EventMachine.run {
        puts 'EventMachine.run'
        @channel = EM::Channel.new
        puts 'EM::Channel.new'
        EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
          puts 'EventMachine::WebSocket.start'
          ws.onopen {
            puts 'ws.onopen'
            sid = @channel.subscribe { |msg| ws.send msg }
            ws.onmessage { |msg|
              puts 'ws.onmessage'
              @channel.push msg
            }
            ws.onclose {
              puts 'ws.onclose'
              @channel.unsubscribe(sid)
            }
          }
        end
      }
    end
    puts 'Initialized WebSocketsServer'
  end
  def send_message(msg)
    @messages << msg
    send_messages_from_queue
  end
  def send_messages_from_queue
    #if some_condition
      puts "Channel: #{@channel.inspect}n"
      begin
        while data = @messages.pop(true)
          @channel.push(data)
        end
      rescue ThreadError
        #raised if queue is empty
      end
    #end
  end
end

初始化:

require 'web_sockets_server'
unless ( File.basename($0) == 'rake' || defined?(Rails::Console))
  ws = WebSocketsServer.instance
  puts 'Sleeping'
  sleep 10
  puts 'Calling send_message'
  ws.send_message 'Test'
  puts 'Called send_message'
end

客户端JavaScript:

<html>
<head>
<script type="text/javascript">
var socket = new WebSocket("ws://192.168.0.12:9090");
socket.onopen = function(){
  console.log('onopen');
};
socket.onmessage = function(event){
  console.log(event.data);
  socket.close();
};
</script>
</head>
<body>
</body>
</html>

问题是:如何从服务器端发送消息到特定的WebSocket JavaScript客户端?假设我有current_user。客户端的Id,对应current_user。

我不知道如何编写ruby代码,所以对此持保留态度,但是查看代码,我会做如下操作:

既然你有 sid = @channel.subscribe { |msg| ws.send msg }

并在取消订阅时删除它,我假设sid(套接字id)保存在通道中。

因此,您需要像 这样的内容。

ws = @channel.find sid ws.send msg

所以你需要实现@channel。查找(如果不存在)获取与id对应的套接字。

您也可以在订阅队列到通道时自己保留队列,但这是多余的,因为这看起来正是通道应该做的。

所以,EventMachine::ChannelEventMachine::WebSocket是不同的,不相交的东西,它们分开生活,除了通道可以订阅套接字消息。

你甚至可以使用不涉及通道的websocket解决方案。

单播的关键是在ws.onopen处理程序中创建一个新通道,并在数组中维护您自己的通道列表。

您可以为每个套接字连接创建一个通道,或者为多个套接字订阅一个通道(在这种情况下,所有通道的套接字将同时接收数据)。我选择了一个混合解决方案:所有具有相同UserId的浏览器窗口都订阅到相同的通道,因此一个通道代表一个用户。

这是我在RoR端使用的现成代码。我把它原样贴在这里:

require 'em-websocket'
require 'socket'
class WebSocketsServer
  def self.instance
    @inst ||= self.new
  end
  def initialize
    @channels = []
    @messages = Queue.new
    Thread.new do
      EventMachine.run {
        begin
        EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
          ws.class.module_eval { attr_accessor :channel, :custom_sid, :cookie_name, :cookie_value, :page_token }
          ws.onopen { |handshake|
            begin
              log 'ws.onopen'
              cookie = handshake.headers['Cookie']
              ws.cookie_name, ws.cookie_value = cookie.split('=',2) if cookie
              par = Rack::Utils.parse_nested_query(handshake.query_string)
              user_id = par['user_id']
              ptoken = par['page_token']
              if user_id.nil? || ptoken.nil?
                log 'user_id or page_token not defined. Closing websocket.'
                ws.close
              else
                log 'init'
                ws.page_token = ptoken
                channel = @channels.detect {|ch| ch.custom_user_id = user_id}
                unless channel
                  log 'Channel not found. Creating.'
                  channel = EventMachine::Channel.new
                  channel.class.module_eval { attr_accessor :custom_user_id, :sessions }
                  channel.custom_user_id = user_id
                  channel.sessions = []
                  @channels << channel
                end
                if channel.sessions.detect {|sessid| sessid == ws.page_token}
                  log 'Socket already subscribed'
                else
                  log 'Subscribing channel to socket.'
                  ws.channel = channel
                  channel.sessions << ws.page_token
                  ws.custom_sid = channel.subscribe { |msg| ws.send msg }
                end
              end
            rescue Exception => ex
              ws.close
              log "ws.onopen exception: #{ex.message}"
            end
          }
          ws.onmessage { |msg|
            begin
              data = JSON.parse(msg)
              message_type = data['message_type']
              if message_type == 'ping'
                ws.send({ message_type: 'pong' }.to_json)
              end
              if message_type == 'phone_call'
                order_id = data['order_id']
                user_id = data['user_id']
                log "phone call: order_id=#{order_id.inspect}, user_id=#{user_id.inspect}"
                self.send_phone_call(order_id, user_id) if order_id && user_id
              end
            rescue Exception => ex
              ws.close
              log "ws.onmessage exception: #{ex.message}"
            end
          }
          ws.onclose {
            begin
              log 'ws.onclose'
              channel = ws.channel
              if channel
                log 'Unsubscribing channel.'
                channel.sessions.delete(ws.page_token)
                channel.unsubscribe(ws.custom_sid)
                if channel.sessions.length==0
                  log 'No subscribers left. Deleting channel.'
                  #@channels.delete(channel)
                end
              end
            rescue Exception => ex
              log "ws.onclose exception: #{ex.message}"
            end
          }
        end
        rescue Exception => ex
          log "EM.run exception: #{ex.message}"
        end
      }
    end
    init_ipc
  end
  def init_ipc
    ipc = IPC.instance
    ipc.on_send_message do |msg, user_id|
      log 'on_send_message'
      send_message_raw(msg, user_id)
    end
  end
  def send_message_raw(msg, user_id=nil)
    log "send_message_raw msg=#{msg.inspect}"
    @messages << {data: msg, user_id: user_id}
    send_messages_from_queue
  end
  def send_message(msg, user_id=nil)
    IPC.instance.send_message(msg, user_id)
  end
  def send_messages_from_queue
    while msg = @messages.pop(true)
      if msg[:user_id]
        #Сообщение определённому пользователю
        channel = @channels.detect {|ch| ch.custom_user_id = msg[:user_id]}
        channel.push(msg[:data]) if channel
      else
        #Широковещательное сообщение
        @channels.each do |channel|
          channel.push(msg[:data])
        end
      end
    end
  rescue ThreadError
    #raised if queue is empty
  end
  def send_coordinates(work_shift, driver, coord, user_id = nil)
    send_message({
        message_type: 'coordinates',
        workShift: {
            id: work_shift.id,
            #currentState: work_shift.current_state,
            #openedAt: work_shift.opened_at,
            #closedAt: work_shift.closed_at,
            #createdAt: work_shift.created_at,
            #updatedAt: work_shift.updated_at,
            #scheduledOpenedAt: work_shift.scheduled_opened_at,
            #scheduledClosedAt: work_shift.scheduled_closed_at,
            #position: work_shift.position,
            #driver: {
                #id: driver.id,
                #callsign: driver.callsign,
                #name: driver.name,
                #type: driver.condition.title
            #},
            #car: {
            #    id: work_shift.car.id,
            #    brand: work_shift.car.brand
            #},
            #client: {
            #    phoneNumber: (work_shift.trips.first) ? work_shift.trips.first.order.client.phone.number : ''
            #},
            coord: {
                lat: coord.latitude,
                lon: coord.longitude
            }
        }
    }.to_json, user_id)
  end
  def send_order_acceptance(order, trip, is_accepted, order_id, user_id = nil)
    send_message({
        message_type: 'order_acceptance',
        order: {
            accepted: is_accepted,
            id: order_id,
            startPointName: (order && order.points && order.points.first) ? order.points.first.address : '',
            endPointName: (order && order.points && order.points.last) ? order.points.last.address : ''
        },
        trip: {
            id: trip.nil? ? nil : trip.id,
            currentStateTbl: trip.nil? ? nil : trip.current_state_tbl
        }
    }.to_json, user_id)
  end
  def send_call_request(driver, client, link_type, user_id = nil)
    par = {
        message_type: 'call_request',
        target: case link_type
                  when NavServer::LinkType::DRIVER_TO_CLIENT
                    'driver_to_client'
                  when NavServer::LinkType::DISPATCHER_TO_CLIENT
                    'dispatcher_to_client'
                  else
                    'dispatcher_to_driver'
                end
    }
    if driver
      par[:driver] = {
          id: driver.id,
          name: driver.name,
          phone: driver.phone_number
      }
    end
    if client && client.phone
      par[:client] = {
          phone: client.phone.number
      }
    end
    send_message par.to_json, user_id
  end
  def send_alarm(driver, user_id = nil)
    send_message({
        message_type: 'alarm',
        driver: {
            id: driver.id,
            name: driver.name,
            phone: driver.phone_number
        }
    }.to_json, user_id)
  end
  def send_log_item(text, type, user_id = nil)
    send_message({
        message_type: 'log',
        event_type: type,
        text: text
    }.to_json, user_id)
  end
  def send_status(pars, user_id = nil)
    send_message({
        message_type: 'state',
        driver: {
            id: pars[:driver].nil? ? nil : pars[:driver].id
        },
        workshift: {
            id: pars[:workshift_id].nil? ? nil : pars[:workshift_id],
            state: pars[:workshift_state].nil? ? nil : pars[:workshift_state],
            state_str: pars[:workshift_state_str].nil? ? nil : pars[:workshift_state_str],
            tbl_class: pars[:workshift_tbl_class].nil? ? nil : pars[:workshift_tbl_class],
        },
        trip: {
            state: pars[:trip_state].nil? ? nil : pars[:trip_state],
            id: pars[:trip_id].nil? ? nil : pars[:trip_id],
            tbl_class: pars[:trip_tbl_class].nil? ? nil : pars[:trip_tbl_class],
            state_str: pars[:trip_state_str].nil? ? nil : pars[:trip_state_str],
        },
        order: {
            id: pars[:order_id].nil? ? nil : pars[:order_id]
        }
    }.to_json, user_id)
  end
  def send_phone_call(order_id, user_id = nil)
    send_message({
        message_type: 'phone_call',
        order_id: order_id
    }.to_json, user_id)
  end
  private
  def log(msg)
    puts "WS: #{msg}n"
  end
end

相关内容

  • 没有找到相关文章

最新更新