处理从卡夫卡检索到的每个记录后,提交的正确方法是什么?



我遇到了一些麻烦,了解如何为我消耗的每个记录手动正确投入。

首先,让我们从https://kafka.apache.org/090/javadoc/org/apache/kafka/kafka/clients/consumer/kafkaconsumer.html

>
while (true) {
     ConsumerRecords<String, String> records = consumer.poll(100);
     for (ConsumerRecord<String, String> record : records) {
         buffer.add(record);
     }
     if (buffer.size() >= minBatchSize) {
         insertIntoDb(buffer);
         consumer.commitSync();
         buffer.clear();
     }
 }

此示例仅在处理了民意调查中收到的所有记录后才提交。我认为这不是一个很好的方法,因为如果我们收到三个记录,并且我的服务在处理第二个记录时死亡,最终将再次消耗第一个记录,这是不正确的。

因此,有第二个示例涵盖按每分期分区进行记录:

try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

但是,我认为这遇到了同样的问题,它只有在处理来自特定分区的所有记录后才提交。

我设法提出的解决方案是:

        val consumer: Consumer<String, MyEvent> = createConsumer(bootstrap)
        consumer.subscribe(listOf("some-topic"))
        while (true) {
            val records: ConsumerRecords<String, MyEvent> = consumer.poll(Duration.ofSeconds(1))
            if (!records.isEmpty) {
                mainLogger.info("Received ${records.count()} events from CRS kafka topic, with partitions ${records.partitions()}")
                records.forEach {
                    mainLogger.debug("Record at offset ${it.offset()}, ${it.value()}")
                    processEvent(it.value()) // Complex event processing occurs in this function
                    consumer.commitSync(mapOf(TopicPartition(it.topic(), it.partition()) to OffsetAndMetadata (it.offset() + 1)))
                }
            }
        }

现在,这在我进行测试时似乎有效。到目前为止,在我的测试期间,似乎只使用了一个分区(我已经通过记录记录来检查此分区。

这种方法会引起任何问题吗?消费者API似乎没有提供在不指定分区的情况下提交偏移的方法,这对我来说似乎有些奇怪。我在这里错过了什么吗?

没有对或错误的提交方式。这确实取决于您的用例和应用程序。

承诺每个偏移都会提供更多的颗粒状控制,但在性能方面具有含义。在频谱的另一侧,您可以每x秒(如自动提交一样(异步提交,而且开销很少,但控制范围很少。


在第一个示例中,事件是在批处理中处理和投入的。在性能方面很有趣,但是就错误而言,可以重新处理完整批次。

在第二个示例中,它也是批处理,但仅每个分区。这应该导致较小的批次,因此性能较少,但如果事情错误,则更少的后处理。

在上一个示例中,您选择提交每个消息。尽管这给出了最大的控制,但它会显着影响性能。此外,与其他情况一样,它不是完全错误的。

如果该应用程序在处理事件后崩溃,但在提交事件之前,重新启动最后一个事件后,可能会重新加入(即至少一次语义(。但至少只有一个事件应受到影响。

如果您需要一次语义,则需要使用交易生产者。

最新更新