下面是我使用HTML5 Web套接字的代码:
服务器代码:require 'em-websocket'
class WebSocketsServer
def self.instance
@inst ||= self.new
end
def initialize
@messages = Queue.new
Thread.new do
puts 'Initializing WebSocketsServer'
EventMachine.run {
puts 'EventMachine.run'
@channel = EM::Channel.new
puts 'EM::Channel.new'
EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
puts 'EventMachine::WebSocket.start'
ws.onopen {
puts 'ws.onopen'
sid = @channel.subscribe { |msg| ws.send msg }
ws.onmessage { |msg|
puts 'ws.onmessage'
@channel.push msg
}
ws.onclose {
puts 'ws.onclose'
@channel.unsubscribe(sid)
}
}
end
}
end
puts 'Initialized WebSocketsServer'
end
def send_message(msg)
@messages << msg
send_messages_from_queue
end
def send_messages_from_queue
#if some_condition
puts "Channel: #{@channel.inspect}n"
begin
while data = @messages.pop(true)
@channel.push(data)
end
rescue ThreadError
#raised if queue is empty
end
#end
end
end
初始化:
require 'web_sockets_server'
unless ( File.basename($0) == 'rake' || defined?(Rails::Console))
ws = WebSocketsServer.instance
puts 'Sleeping'
sleep 10
puts 'Calling send_message'
ws.send_message 'Test'
puts 'Called send_message'
end
客户端JavaScript:
<html>
<head>
<script type="text/javascript">
var socket = new WebSocket("ws://192.168.0.12:9090");
socket.onopen = function(){
console.log('onopen');
};
socket.onmessage = function(event){
console.log(event.data);
socket.close();
};
</script>
</head>
<body>
</body>
</html>
问题是:如何从服务器端发送消息到特定的WebSocket JavaScript客户端?假设我有current_user。客户端的Id,对应current_user。
我不知道如何编写ruby代码,所以对此持保留态度,但是查看代码,我会做如下操作:
既然你有
sid = @channel.subscribe { |msg| ws.send msg }
并在取消订阅时删除它,我假设sid(套接字id)保存在通道中。
因此,您需要像 这样的内容。
ws = @channel.find sid
ws.send msg
所以你需要实现@channel。查找(如果不存在)获取与id对应的套接字。
您也可以在订阅队列到通道时自己保留队列,但这是多余的,因为这看起来正是通道应该做的。
所以,EventMachine::Channel
和EventMachine::WebSocket
是不同的,不相交的东西,它们分开生活,除了通道可以订阅套接字消息。
你甚至可以使用不涉及通道的websocket解决方案。
单播的关键是在ws.onopen
处理程序中创建一个新通道,并在数组中维护您自己的通道列表。
您可以为每个套接字连接创建一个通道,或者为多个套接字订阅一个通道(在这种情况下,所有通道的套接字将同时接收数据)。我选择了一个混合解决方案:所有具有相同UserId的浏览器窗口都订阅到相同的通道,因此一个通道代表一个用户。
这是我在RoR端使用的现成代码。我把它原样贴在这里:
require 'em-websocket'
require 'socket'
class WebSocketsServer
def self.instance
@inst ||= self.new
end
def initialize
@channels = []
@messages = Queue.new
Thread.new do
EventMachine.run {
begin
EventMachine::WebSocket.start(:host => '0.0.0.0', :port => 9090) do |ws|
ws.class.module_eval { attr_accessor :channel, :custom_sid, :cookie_name, :cookie_value, :page_token }
ws.onopen { |handshake|
begin
log 'ws.onopen'
cookie = handshake.headers['Cookie']
ws.cookie_name, ws.cookie_value = cookie.split('=',2) if cookie
par = Rack::Utils.parse_nested_query(handshake.query_string)
user_id = par['user_id']
ptoken = par['page_token']
if user_id.nil? || ptoken.nil?
log 'user_id or page_token not defined. Closing websocket.'
ws.close
else
log 'init'
ws.page_token = ptoken
channel = @channels.detect {|ch| ch.custom_user_id = user_id}
unless channel
log 'Channel not found. Creating.'
channel = EventMachine::Channel.new
channel.class.module_eval { attr_accessor :custom_user_id, :sessions }
channel.custom_user_id = user_id
channel.sessions = []
@channels << channel
end
if channel.sessions.detect {|sessid| sessid == ws.page_token}
log 'Socket already subscribed'
else
log 'Subscribing channel to socket.'
ws.channel = channel
channel.sessions << ws.page_token
ws.custom_sid = channel.subscribe { |msg| ws.send msg }
end
end
rescue Exception => ex
ws.close
log "ws.onopen exception: #{ex.message}"
end
}
ws.onmessage { |msg|
begin
data = JSON.parse(msg)
message_type = data['message_type']
if message_type == 'ping'
ws.send({ message_type: 'pong' }.to_json)
end
if message_type == 'phone_call'
order_id = data['order_id']
user_id = data['user_id']
log "phone call: order_id=#{order_id.inspect}, user_id=#{user_id.inspect}"
self.send_phone_call(order_id, user_id) if order_id && user_id
end
rescue Exception => ex
ws.close
log "ws.onmessage exception: #{ex.message}"
end
}
ws.onclose {
begin
log 'ws.onclose'
channel = ws.channel
if channel
log 'Unsubscribing channel.'
channel.sessions.delete(ws.page_token)
channel.unsubscribe(ws.custom_sid)
if channel.sessions.length==0
log 'No subscribers left. Deleting channel.'
#@channels.delete(channel)
end
end
rescue Exception => ex
log "ws.onclose exception: #{ex.message}"
end
}
end
rescue Exception => ex
log "EM.run exception: #{ex.message}"
end
}
end
init_ipc
end
def init_ipc
ipc = IPC.instance
ipc.on_send_message do |msg, user_id|
log 'on_send_message'
send_message_raw(msg, user_id)
end
end
def send_message_raw(msg, user_id=nil)
log "send_message_raw msg=#{msg.inspect}"
@messages << {data: msg, user_id: user_id}
send_messages_from_queue
end
def send_message(msg, user_id=nil)
IPC.instance.send_message(msg, user_id)
end
def send_messages_from_queue
while msg = @messages.pop(true)
if msg[:user_id]
#Сообщение определённому пользователю
channel = @channels.detect {|ch| ch.custom_user_id = msg[:user_id]}
channel.push(msg[:data]) if channel
else
#Широковещательное сообщение
@channels.each do |channel|
channel.push(msg[:data])
end
end
end
rescue ThreadError
#raised if queue is empty
end
def send_coordinates(work_shift, driver, coord, user_id = nil)
send_message({
message_type: 'coordinates',
workShift: {
id: work_shift.id,
#currentState: work_shift.current_state,
#openedAt: work_shift.opened_at,
#closedAt: work_shift.closed_at,
#createdAt: work_shift.created_at,
#updatedAt: work_shift.updated_at,
#scheduledOpenedAt: work_shift.scheduled_opened_at,
#scheduledClosedAt: work_shift.scheduled_closed_at,
#position: work_shift.position,
#driver: {
#id: driver.id,
#callsign: driver.callsign,
#name: driver.name,
#type: driver.condition.title
#},
#car: {
# id: work_shift.car.id,
# brand: work_shift.car.brand
#},
#client: {
# phoneNumber: (work_shift.trips.first) ? work_shift.trips.first.order.client.phone.number : ''
#},
coord: {
lat: coord.latitude,
lon: coord.longitude
}
}
}.to_json, user_id)
end
def send_order_acceptance(order, trip, is_accepted, order_id, user_id = nil)
send_message({
message_type: 'order_acceptance',
order: {
accepted: is_accepted,
id: order_id,
startPointName: (order && order.points && order.points.first) ? order.points.first.address : '',
endPointName: (order && order.points && order.points.last) ? order.points.last.address : ''
},
trip: {
id: trip.nil? ? nil : trip.id,
currentStateTbl: trip.nil? ? nil : trip.current_state_tbl
}
}.to_json, user_id)
end
def send_call_request(driver, client, link_type, user_id = nil)
par = {
message_type: 'call_request',
target: case link_type
when NavServer::LinkType::DRIVER_TO_CLIENT
'driver_to_client'
when NavServer::LinkType::DISPATCHER_TO_CLIENT
'dispatcher_to_client'
else
'dispatcher_to_driver'
end
}
if driver
par[:driver] = {
id: driver.id,
name: driver.name,
phone: driver.phone_number
}
end
if client && client.phone
par[:client] = {
phone: client.phone.number
}
end
send_message par.to_json, user_id
end
def send_alarm(driver, user_id = nil)
send_message({
message_type: 'alarm',
driver: {
id: driver.id,
name: driver.name,
phone: driver.phone_number
}
}.to_json, user_id)
end
def send_log_item(text, type, user_id = nil)
send_message({
message_type: 'log',
event_type: type,
text: text
}.to_json, user_id)
end
def send_status(pars, user_id = nil)
send_message({
message_type: 'state',
driver: {
id: pars[:driver].nil? ? nil : pars[:driver].id
},
workshift: {
id: pars[:workshift_id].nil? ? nil : pars[:workshift_id],
state: pars[:workshift_state].nil? ? nil : pars[:workshift_state],
state_str: pars[:workshift_state_str].nil? ? nil : pars[:workshift_state_str],
tbl_class: pars[:workshift_tbl_class].nil? ? nil : pars[:workshift_tbl_class],
},
trip: {
state: pars[:trip_state].nil? ? nil : pars[:trip_state],
id: pars[:trip_id].nil? ? nil : pars[:trip_id],
tbl_class: pars[:trip_tbl_class].nil? ? nil : pars[:trip_tbl_class],
state_str: pars[:trip_state_str].nil? ? nil : pars[:trip_state_str],
},
order: {
id: pars[:order_id].nil? ? nil : pars[:order_id]
}
}.to_json, user_id)
end
def send_phone_call(order_id, user_id = nil)
send_message({
message_type: 'phone_call',
order_id: order_id
}.to_json, user_id)
end
private
def log(msg)
puts "WS: #{msg}n"
end
end