数据对象与多个工作节点的一致性



我正在尝试创建一个简单的服务器,它将给不同的工作人员每个新的请求。DATA object是一个单独文件中的简单javascript对象。我面临的问题是这个DATA object的一致性。

如果前一个请求仍在进行中,如何防止worker处理请求?
例如,第一个请求是UPDATE并且持续时间更长,下一个请求是DELETE并且进行得更快
我需要使用什么节点工具或模式来100%确定DELETE将在UPDATE之后发生?
我需要在不同的端口上运行每个worker

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;
cluster.schedulingPolicy = cluster.SCHED_RR;
const PORT = 4000; 
if (cluster.isMaster) {
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
} else {
http.createServer((req, res) => {
if(req.url === '/users' && req.method === "PUT") {
updateUser(req)
} else if(req.url === '/users' && req.method === "DELETE") {
deleteUser(req)
} 
}).listen(PORT++);
}

每个worker必须保留("lock") DATA对象供独占使用,然后才能更改它。这可以通过写入锁文件并在成功更改对象后再次删除它来实现。

try {
fs.openSync("path/to/lock/file", "wx+");
/* Change DATA object */
fs.rmSync("path/to/lock/file");
} catch(err) {
if (err.code === "EEXIST") throw "locking conflict";
}

执行第一个(UPDATE)请求的工作线程将成功写入锁文件,但是执行第二个(DELETE)请求的并发工作线程将遇到锁冲突。然后,它可以向用户报告失败,或者在短暂的等待时间后重新尝试。

(如果您决定以这种方式实现锁,异步fs方法可能更有效。)

您的代码甚至不会创建多个服务器,将不同的端口放在一边,并且PORT变量是const,因此它也不会增加。

我需要使用什么节点工具或模式来100%确定DELETE将在UPDATE之后发生?

  • 使用某种锁,JavaScript中还没有
  • 使用一个信号量/互斥量变量锁(见代码)。

请记住,JavaScript是单线程语言。

需要在不同的端口上运行每个worker

对于每个worker,根据worker ID设置监听(见代码)。请记住,CPU不能生成与内核数量相等的工作线程。

示例工作代码:

const express = require('express')
const cluster = require('cluster')
const os = require('os')
if (cluster.isMaster) {
for (let i = 0; i < os.cpus().length; i++) {
cluster.fork()
}
} else {
const app = express()
// Global semaphore/mutex variable isUpdating
var isUpdating = false;
const worker = {
handleRequest(req, res) {
console.log("handleRequest on worker /" + cluster.worker.id);
if (req.method == "GET") { // FOR BROWSER TESTING, CHANGE IT LATER TO PUT
isUpdating = true;
console.log("updateUser GET");
// do updateUser(req);
isUpdating = false;
} else if (req.method == "DELETE") {
if (!isUpdating) { // Check for update lock
console.log("updateUser DELETE");
// do deleteUser(req)
}
}
},
}
app.get('/users', (req, res) => {
worker.handleRequest(req, res)
})
// Now each worker will run on different port
app.listen(4000 + cluster.worker.id, () => {
console.log(`Worker ${cluster.worker.id} started listening on port ${4000 + cluster.worker.id}`)
})
}

最新更新