等待KafkaJS的领导人选举



形势

我正在使用kafkaj来编写一些动态生成的kafka主题。

我发现在注册制作人后立即写这些主题通常会导致一个错误:There is no leader for this topic-partition as we are in the middle of a leadership election

完整错误为:

{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}

守则

以下是导致问题的代码:

import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
await producer.connect()
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()

问题

两个问题:

  1. 在连接到生产者(或发送(时,我应该做什么特别的事情来确保逻辑阻塞,直到生产者真正准备好向kafka主题发送数据
  2. 在向生产者发送数据以确保消息不会被丢弃时,我应该做什么特别的事情吗

解决方案

Kafkajs通过管理客户端提供了一个createTopics方法,该方法有一个可选的waitForLeaders标志:

admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
}

使用它可以解决问题。

import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
const admin = kafka.admin()
await admin.connect()
await producer.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
})
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()

不幸的是,如果主题已经存在,这将导致不同的错误,但这是一个单独的问题,我怀疑这个错误更多的是信息性的,而不是破坏性的。

{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}

EDIT:上述设置确实需要正确配置Kafka实例。领导层选举可能永远不会解决,在这种情况下,KafkaJS仍然会抱怨领导层选举!

根据我的经验,这是由于一名卡夫卡经纪人在没有从动物园管理员那里注销的情况下被叫停,这意味着动物园管理员正在等待已经不存在的东西的回应。

最新更新