为多个主题创建一个 Kafka 使用者



我想为多个主题创建单个 kafka 使用者。消费者的方法构造函数允许我在订阅中传输主题列表的参数,如下所示:

private Consumer createConsumer() {
Properties props = getConsumerProps();
Consumer<String, byte[]> consumer = new KafkaConsumer<>(props);
ArrayList<String> topicMISL = new ArrayList<>();
for (String s:Connect2Redshift.kafkaTopics) {
topicMISL.add(systemID + "." + s);
}
consumer.subscribe(topicMISL);
return consumer;
}

private boolean consumeMessages( Duration duration, Consumer<String, byte[]> consumer) {
try {  Long start = System.currentTimeMillis();
ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(duration);
}
}

之后,我想每 3 秒将 kafka 中的记录轮询到流中并处理它们,但我想知道这个消费者内部有什么 - 如何轮询来自不同主题的记录 - 首先是一个主题,然后是另一个主题,或者并行。会不会是一个消息量很大的主题会一直被处理,而另一个消息量很少的主题会等待?

通常取决于您的主题设置。Kafka 通过为每个主题使用多个分区进行扩展。

  • 如果你在 1 个主题上有 3 个分区,kafka 可以并行读取它们
  • 多个主题也是如此,阅读可以并行进行

如果某个分区接收的消息比其他分区多得多,则可能会遇到此特定分区的使用者滞后的情况。调整批量大小和使用者设置可能会对他们有所帮助,也可以压缩消息。 理想情况下,确保均匀分配负载可避免这种情况。

看看这篇博客文章,它让我对内部结构有了很好的理解:https://www.confluent.io/blog/configure-kafka-to-minimize-latency/

ConsumerRecords<String, String> records = consumer.poll(long value);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {

}

}

还需要通过使用consumer.commitSync查找偏移量和提交来提交偏移量

相关内容

最新更新