Kafka 是否支持每条记录提取



截至2019 年 10 月使用 Kafka v2.3.0 和 Zookeeper v3.5.5

场景:

考虑以下场景,其中使用者可以根据以下假设从主题中获取记录;

  1. Fetch 的设计方式是,它可以在 1 次拍摄中获取N条记录,具体取决于每个容量的读取次数(以字节为单位(。这是以配置参数的形式实现和提供的,例如max.partition.fetch.bytes.
  2. Fetch 的设计方式是,它可以根据每个记录的提取来提取N条记录。因此,如果我将该参数设置为 1 ,这意味着它将在每次获取尝试中获取 1 条记录。这就是卡夫卡现在所缺少的。这可能是由于设计范例和对流式处理、并发、分区功能等的支持。

问题:

  1. 目前有什么方法可以让我每次提取仅获取 1 条记录并相应地工作?
  2. 我的应用程序要求实时流式传输记录和每次提取 1 条记录。两者都可以使用 Kafka 实现吗,还是我最终必须使用 2 种不同的消息传递系统,如 Kafka + ActiveMQ 等?

请指导我。提前非常感谢你。

一个 NPM 包kafkajs确实支持并帮助您使用fetch_per_record机制,下面是实现此目的的示例代码,

const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['192.168.1.172:9092']
})
const consumer = kafka.consumer({ groupId: 'test-group' })
const run = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'test', fromBeginning: false })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
value: message.value.toString(),
})
},
})
}
run().catch(console.error);

在上面的代码中,您有eachMessage方法可以帮助我们实现此机制

最新更新