我的消费者的初始化方式:
const client = new kafka.Client(config.ZK_HOST)
const consumer = new kafka.Consumer(client, [{ topic: config.KAFKA_TOPIC, offset: 0}],
{
autoCommit: false
})
现在消费者consumer.on('message', message => applyMessage(message))
事情是applyMessage
使用knex
与数据库对话,代码看起来像这样:
async function applyMessage(message: kafka.Message) {
const usersCount = await db('users').count()
// just assume we ABSOLUTELY need to calculate a number of users,
// so we need previous state
await db('users').insert(inferUserFromMessage(message))
}
上面的代码使 kafka 中的所有消息并行执行applyMessage
,因此在上面的代码中,鉴于数据库中还没有用户,即使对于来自 kafka 的第二条消息,usersCount
也始终为 0,自第一次调用 applyMessage
插入用户以来,它应该是 1。
如何以所有applyMessage
函数按顺序运行的方式"同步"代码?
你需要实现某种互斥体。基本上是一个将事物排队以同步执行的类。例
var Mutex = function() {
this.queue = [];
this.locked = false;
};
Mutex.prototype.enqueue = function(task) {
this.queue.push(task);
if (!this.locked) {
this.dequeue();
}
};
Mutex.prototype.dequeue = function() {
this.locked = true;
const task = this.queue.shift();
if (task) {
this.execute(task);
} else {
this.locked = false;
}
};
Mutex.prototype.execute = async function(task) {
try { await task(); } catch (err) { }
this.dequeue();
}
为了使它工作,你的applyMessage
函数(无论哪个处理 Kafka 消息)需要返回一个Promise
- 另请注意,异步已从父函数移动到返回的 Promise 函数:
function applyMessage(message: kafka.Message) {
return new Promise(async function(resolve,reject) {
try {
const usersCount = await db('users').count()
// just assume we ABSOLUTELY need to calculate a number of users,
// so we need previous state
await db('users').insert(inferUserFromMessage(message))
resolve();
} catch (err) {
reject(err);
}
});
}
最后,每次调用applyMessage
都需要添加到互斥队列中,而不是直接调用:
var mutex = new Mutex();
consumer.on('message', message => mutex.enqueue(function() { return applyMessage(message); }))