Kafka 通过多个分区和多个使用者线程提高吞吐量



我正在为某些应用程序使用 kafka 流。

流流量如下所示

kafkaProducer---->StreamerConsumer1->finalCosumer

我有生产者,它写入数据的速度非常快,我的 StreamConsumer 将使用一些进程映射每个流并将流转发到其他主题。

在我的 StreamCosumer 地图中,我添加了自己的映射器函数,该函数实际上尝试持久化其相关数据,如下所示

public void checkRecord(T1 key, T2 value) {
switch(T1.toString()){
case "key1":
//Get relavant fileds from value and perisit in db 
break;
case "key2":
//Get relavant fileds from value and perisit in db 
break;
}
}

KStream<String, KafkaStatusRecordWrapper> pDStream[] = myStream.map(this::checkRecord).branch((key, value)-> value.isSuccess(),(key, value)-> !value.isSuccess());
pDStream[0].mapValues(value -> transformer(value)).to("other_topic",Produced.with(stringSerde, stringSerde));   

现在我的 checkRecord 记录消费者函数是单线程的,几乎需要 300 毫秒(由于一些业务逻辑和我无法避免的数据库持久性)才能返回。

我不能增加分区的数量,因为我们的基础设施有一些限制,也由于以下限制

More Partitions Requires More Open File Handles
More Partitions May Increase Unavailability
More Partitions May Increase End-to-end Latency

所以我打算编写多线程流消费者。

但我担心以下几点。

  1. 我只需要处理一次记录
  2. 移交给另一个线程将导致偏移管理出现问题。

那么如何提高吞吐量呢?

我的消费者有足够的资源,只有 40% 的资源被使用。

您可以设置流配置num.stream.threads来配置线程数。最大值可以是最大分区数。它有助于提高应用程序实例的并行性。

假设您的主题有 4 个分区,您可以设置以下内容:

properties.set("num.stream.threads",4);

相关内容

  • 没有找到相关文章

最新更新