无法在express中正确管理redis连接



我使用redis包,并且我有很多连接问题,连接突然给出ECONNREFUSED。

我怀疑这是因为我做了错误的连接管理。

这个项目的问题是,我的应用程序向api发送参数(ip和端口(,并且api必须在给定这些值的情况下创建一个连接,获取一些数据并返回。我有数百台服务器,所以我不知道如何管理所有这些连接。

到目前为止,我只在一个连接中管理它。这就是我认为它失败的原因。

目前看起来是这样的。。

let redisClient;
const killRedis = () => {
redisClient.quit()
console.log("redis client shut down")
}

const createRedisClient = async (port, url) => {
redisClient = require('redis').createClient(port, url, {
no_ready_check: true,
db: 1
})
redisClient.on('error', function (err) {
console.log('Error ' + err);
killRedis();
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
});
return redisClient;
}
module.exports = { createRedisClient, }

这有点奏效,但最终失败了,重新连接不时被拒绝。

我在我的路线上像下面这样使用它。

const scanAll = async (port, url) => {
const redisClient = await createRedisClient(port, url)
if (!redisClient) return 500
const scan = promisify(redisClient.scan).bind(redisClient);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
return found;
};
/* Return all the users id */
router.post('/user/list', async function (req, res, next) {
const data = await scanAll(req.body.port, req.body.ip);
console.log("data ", data)
if (data === 500) {
res.status(500).json({
error: "Error, server connection refused"
})
}
else if (data.length === 0) {
res.status(204).json(data)
} else {
res.status(200).json(data);
}
})

如何进行正确的连接管理?

编辑:我的新尝试,但我认为在两次同时请愿时,我的联系已经溢出

let connections = []

findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
console.log("pre")
console.log(connection)
if (connection && connection.connection) {
console.log("opcion1: ", connection.ip)
console.log("connection already exists")
return connection[0].connection
} else {
console.log("opcion2")
console.log(connections)
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
const createRedisClient = async (port, url) => {
let redisClient = findConnection(url, port)
redisClient.on('error', function (err) {
console.log('Error ' + err);
redisClient.quit()
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
});
return redisClient;
}
module.exports = { createRedisClient, }

我注意到我得到以下错误

MaxListenersExceeddWarning:可能的EventEmitter内存泄漏检测到。添加了11个错误侦听器。使用发射器.setMaxListeners((增加限制

EDIT:最后一个实现问题

My current implementation is the following
let connections = []
const killRedis = (redisClient, ip, port) => {
redisClient.quit()
connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}
const subscribe = (redisClient, url, port) => {
redisClient.on('error', function (err) {
console.log('Error ' + err);
killRedis(redisClient, url, port)
return undefined;
});
redisClient.on('connect', function () {
console.log('Connected to Redis');
return redisClient;
});
}
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
subscribe(connection[0].connection)
return connection[0].connection
} else {
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}
const createRedisClient = async (port, url) => {
let redisClient = findConnection(url, port)
return redisClient
}
module.exports = { createRedisClient }

它几乎可以工作了,问题是我不知道如何处理的错误

我不知道如何处理错误事件侦听器。如果它失败了,我应该返回一个未定义的,但似乎它没有这么做,

您可以构建一个现有连接的缓存池并重用这些连接,而不是为每个新请求创建一个新的Redis连接。这样,您就不会超过每个Redis连接的事件侦听器的限制如果池中还不存在连接,则只需要创建一个连接即可。

PS-为了简单起见,我目前的缓存池实现为每对主机和端口创建一个连接并存储它们。如果需要,您可以在Cache Pool中实现LRU缓存,以收回未使用的Redis连接。

通过这种方式,您应该能够解决连接管理问题,因为您将创建一次连接并重用它们。

缓存池.js

const redis = require('redis');
const Q = require('q');
class CachePool {
constructor() {
this.cachedClients = {};
this.pendingCachedClients = {};
}
getConnection(host, port) {
const deferred = Q.defer();
const connectionId = CachePool.getConnectionId(host, port);
if (this.cachedClients[connectionId]) {
deferred.resolve(this.cachedClients[connectionId]);
} else {
this.cachedClients[connectionId] = redis.createClient({
host,
port,
});
this.cachedClients[connectionId].on('connect', (connection) => {
deferred.resolve(this.cachedClients[connectionId]);
});
this.cachedClients[connectionId].on('error', (err) => {
deferred.reject(err);
});
}
return deferred.promise;
}
static getConnectionId(host, port) {
return `${host}:${port}`;
}
}
module.exports = new CachePool();

cache.js

const { promisify } = require('util');
const CachePool = require('./cache-pool');
class Cache {
constructor(host, port) {
this.host = host;
this.port = port;
}
connect() {
return CachePool.getConnection(this.host, this.port);
}
async scanAll() {
const redisConnection = await this.connect();
const scan = promisify(redisConnection.scan).bind(redisConnection);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
return found;
}
}
module.exports = Cache;

测试服务器.js

const express = require('express');
const Cache = require('./cache');
const app = express();
app.use(express.json({ type: '*/json' }));
const port = 3000;
/* Return all the users id */
app.post('/user/list', async function (req, res, next) {
const redisClient = new Cache(req.body.ip, req.body.port);
const data = await redisClient.scanAll();
if (data === 500) {
res.status(500).json({
error: "Error, server connection refused"
})
}
else if (data.length === 0) {
res.status(204).json(data)
} else {
res.status(200).json(data);
}
})
app.listen(port, () => {
console.log(`Example app listening at http://localhost:${port}`)
});

单连接解决方案或缓存池解决方案都可用。使用这两种解决方案会遇到一些问题。

  • 单连接解决方案:ECONNREFUSED

我猜redismaxclient到达时触发了错误,因为您的单连接解决方案在处理每个请求时不会关闭客户端。因此,当未达到maxclient限制时,它有点工作,但以ECONNREFUSED结束。

您可以尝试解决它,只需在请求返回之前添加killRedis即可。

const scanAll = async (port, url) => {
const redisClient = await createRedisClient(port, url)
if (!redisClient) return 500
const scan = promisify(redisClient.scan).bind(redisClient);
const found = [];
let cursor = '0';
do {
const reply = await scan(cursor, 'MATCH', '*');
cursor = reply[0];
found.push(...reply[1]);
} while (cursor !== '0');
killRedis(); // add code here.
return found;
};
  • 缓存池解决方案:MaxListenersExceedWarning

请参阅答案MaxListenersExceedWarning:可能检测到EventEmitter内存泄漏。增加了11条留言。使用发射器.setMaxListeners((增加限制

对于每个请求,在createRedisClient函数中运行redisClient.on()两次,订阅两次,然后添加两个新的侦听器。默认限制为10。由于缺少取消订阅,它将以MaxListenersExceedWarning结束。解决方案是将所有redisClient.on代码从createRedisClient函数移动到findConnection函数,在创建连接时只订阅两次,与用户请求无关。

  • 添加了注释代码
findConnection = (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
// subscribe(connection[0].connection) // should be deleted
return connection[0].connection
} else {
connections.push({
ip: ip,
port: port,
connection: require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
})
subscribe(connections.filter(i => i.ip == ip && i.port == port)[0].connection, ip, port)
return connections.filter(i => i.ip == ip && i.port == port)[0].connection
}
}

您可以使用此npm包

npm i connect-redis

我的最终实现与预期的一样工作

let connections = []
const killRedis = (redisClient, ip, port) => {
redisClient.quit()
connections = connections.filter((i) => { return i.ip !== ip && i.port != port })
}
const subscribe = (redisClient, url, port) => new Promise((resolve, reject) => {
redisClient.on('error', function (err) {
killRedis(redisClient, url, port)
reject(err)
});
redisClient.on('connect', function () {
resolve(redisClient)
});
})
findConnection = async (ip, port) => {
let connection = connections.filter(i => i.ip == ip && i.port == port)
if (connection && connection.length > 0) {
return connection[0].connection
} else {
try {
let newConnection = require('redis').createClient(port, ip, {
no_ready_check: true,
db: 1
})
const client = await subscribe(newConnection, ip, port)
connections.push({
ip: ip,
port: port,
connection: newConnection
})
return client
} catch (error) {
return undefined
}
}
}
const createRedisClient = async (port, url) => {
let redisClient = await findConnection(url, port)
return redisClient
}
module.exports = { createRedisClient }

最新更新