Soket IO :向特定套接字连接发出,而不是向所有连接发出



在我的角度应用程序中,我有Kafka消费者将所有消息发送到UI,并且它使UI非常繁重地过滤这些消息。

因此,当我通过角度应用程序连接时,我会使用套接字 IO 连接在参数中传递过滤器。我也在消费者端收到该参数,但问题是当第二个连接请求来自其他用户的参数时,它会考虑该参数而不是第一个参数。

以下是我的代码

角度 4 服务方式[ 客户端 ]

getFeed(Ids: any) {
const observable = new Observable(observer => {
this.socket = io.connect('http://loclahost:3007', { query: 'Ids=' + Ids + '' });
this.socket.on('message', (data) => {
observer.next(data);
});
});
return observable;
}

卡夫卡消费者代码 [ 服务器.js ]

'use strict';
let app = require('express')();
let http = require('http').Server(app);
let io = require('socket.io')(http);
var Kafka = require('no-kafka');
var bodyParser = require('body-parser');
let techIds = [];
app.use( bodyParser.json() );       // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({     // to support URL-encoded bodies
extended: true
})); 
app.use(function(req, res, next) {
res.header("Access-Control-Allow-Origin", "*");
res.header("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
next();
});
app.get('/', function (req, res) {
res.send("hello.");
});
//Socket IO Method
io.on('connection', (socket) => {
console.log('USER CONNECTED');
this.techIds = socket.handshake.query['Ids'];
console.log(this.techIds);
socket.on('disconnect', function(){
console.log('USER DISCONNECTED');
});
});
http.listen(3007, () => {
console.log('started on port 3007');
var consumer = new Kafka.SimpleConsumer({
connectionString: 'localhost:29092,localhost:29093,localhost:29094',
clientId: 'no-kafka-client'
});
var dataHandler = function (messageSet, topic, partition) {
messageSet.forEach((m) => {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
if(topic=="MyMessage")
{
const msg = JSON.parse(m.message.value.toString('utf8'));
if(this.techIds != null && this.techIds != undefined && this.techIds.indexOf(msg.techID.toLowerCase()) > -1)
io.emit('message', JSON.parse(m.message.value.toString('utf8')));
}
});
}.bind(this);
return consumer.init().then(function () {
var v1= consumer.subscribe('JourneyDetails', [0, 1], dataHandler);
var arr=[];
arr.push([v1]);
return arr;
});
});

例如

第一个用户的套接字连接请求 http://localhost:3007?Ids=pm1,pm2,pm3

第二个用户的套接字连接请求是 http://localhost:3007?Ids=pm8,pm9,pm10

因此,在此行中参数值被第二个请求覆盖。

if(this.techIds != null && this.techIds != undefined && this.techIds.indexOf(msg.techID.toLowerCase()) > -1)
io.emit('message', JSON.parse(m.message.value.toString('utf8')));

在这里,我得到这个.techIds值"pm8,pm9,pm10",所以在第一个请求中,我收到pm8,pm9,pm10的消息,而不是pm1,pm2,p3。

任何建议或帮助将不胜感激。

谢谢 普什卡

我对我作为解决方案的一部分所做的不满意,但现在正在运行一些东西。

不高兴,因为我面临客户端阵列的内存问题。当超过 1000 个用户连接时,它会变得如此沉重。我的 for 循环需要太多时间来过滤记录。

请随时提供您的建议/输入以优化以下代码。

'use strict';
let app = require('express')();
let http = require('http').Server(app);
let io = require('socket.io')(http);
var Kafka = require('no-kafka');
var bodyParser = require('body-parser');
let techIds = [];
app.use( bodyParser.json() );       // to support JSON-encoded bodies
app.use(bodyParser.urlencoded({     // to support URL-encoded bodies
extended: true
})); 
app.use(function(req, res, next) {
res.header("Access-Control-Allow-Origin", "*");
res.header("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept");
next();
});
app.get('/', function (req, res) {
res.send("hello.");
});
//Socket IO Method
io.on('connection', (socket) => {
console.log('USER CONNECTED');
socket.on('join', function (data) {
clients.push(
{
'socketId': socket.id,
'mgrId': data.mgrId,
'techIds': data.attuIds
});
//console.log(socket.id + ' ' + data.mgrId + ' USER CONNECTED!!');
});
socket.on('disconnect', function (data) {
if (clients.length > 0) {
let item = clients.find(x => x.socketId == socket.id);
const index = clients.indexOf(item);
if (index > -1) {
//console.log(clients[index].mgrId + ' USER DISCONNECTED!!');
clients.splice(index, 1);
// console.log(clients);
}
}
});
});
http.listen(3007, () => {
console.log('started on port 3007');
var consumer = new Kafka.SimpleConsumer({
connectionString: 'localhost:29092,localhost:29093,localhost:29094',
clientId: 'no-kafka-client'
});
var dataHandler = function (messageSet, topic, partition) {
messageSet.forEach((m) => {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
if(topic=="MyMessage")
{
const msg = JSON.parse(m.message.value.toString('utf8'));
if (clients.length > 0) {
for (var index = 0; index < clients.length; index++) {
var ids = clients[index].techIds;
var idx = ids.indexOf(msg.techID.toLowerCase());
if (idx > -1) {
if (io.sockets.connected[clients[index].socketId]) {
io.sockets.connected[clients[index].socketId].emit('message', msg);
}
}
}
}
}
});
}.bind(this);
return consumer.init().then(function () {
var v1= consumer.subscribe('MyMessage', [0, 1], dataHandler);
var arr=[];
arr.push([v1]);
return arr;
});
});

我认为你应该阅读这篇文章:https://socket.io/docs/emit-cheatsheet/#

io.on('connect', onConnect);
function onConnect(socket){
// sending to the client
socket.emit('hello', 'can you hear me?', 1, 2, 'abc');
// sending to all clients except sender
socket.broadcast.emit('broadcast', 'hello friends!');
// sending to all clients in 'game' room except sender
socket.to('game').emit('nice game', "let's play a game");
// sending to all clients in 'game1' and/or in 'game2' room, except sender
socket.to('game1').to('game2').emit('nice game', "let's play a game (too)");
// sending to all clients in 'game' room, including sender
io.in('game').emit('big-announcement', 'the game will start soon');
// sending to all clients in namespace 'myNamespace', including sender
io.of('myNamespace').emit('bigger-announcement', 'the tournament will start soon');
// sending to individual socketid (private message)
socket.to(<socketid>).emit('hey', 'I just met you');
// sending with acknowledgement
socket.emit('question', 'do you think so?', function (answer) {});
// sending without compression
socket.compress(false).emit('uncompressed', "that's rough");
// sending a message that might be dropped if the client is not ready to receive messages
socket.volatile.emit('maybe', 'do you really need it?');
// sending to all clients on this node (when using multiple nodes)
io.local.emit('hi', 'my lovely babies');
};

最新更新