我遇到了一些麻烦,了解如何为我消耗的每个记录手动正确投入。
首先,让我们从https://kafka.apache.org/090/javadoc/org/apache/kafka/kafka/clients/consumer/kafkaconsumer.html
>while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
此示例仅在处理了民意调查中收到的所有记录后才提交。我认为这不是一个很好的方法,因为如果我们收到三个记录,并且我的服务在处理第二个记录时死亡,最终将再次消耗第一个记录,这是不正确的。
因此,有第二个示例涵盖按每分期分区进行记录:
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
但是,我认为这遇到了同样的问题,它只有在处理来自特定分区的所有记录后才提交。
我设法提出的解决方案是:
val consumer: Consumer<String, MyEvent> = createConsumer(bootstrap)
consumer.subscribe(listOf("some-topic"))
while (true) {
val records: ConsumerRecords<String, MyEvent> = consumer.poll(Duration.ofSeconds(1))
if (!records.isEmpty) {
mainLogger.info("Received ${records.count()} events from CRS kafka topic, with partitions ${records.partitions()}")
records.forEach {
mainLogger.debug("Record at offset ${it.offset()}, ${it.value()}")
processEvent(it.value()) // Complex event processing occurs in this function
consumer.commitSync(mapOf(TopicPartition(it.topic(), it.partition()) to OffsetAndMetadata (it.offset() + 1)))
}
}
}
现在,这在我进行测试时似乎有效。到目前为止,在我的测试期间,似乎只使用了一个分区(我已经通过记录记录来检查此分区。
这种方法会引起任何问题吗?消费者API似乎没有提供在不指定分区的情况下提交偏移的方法,这对我来说似乎有些奇怪。我在这里错过了什么吗?
没有对或错误的提交方式。这确实取决于您的用例和应用程序。
承诺每个偏移都会提供更多的颗粒状控制,但在性能方面具有含义。在频谱的另一侧,您可以每x秒(如自动提交一样(异步提交,而且开销很少,但控制范围很少。
在第一个示例中,事件是在批处理中处理和投入的。在性能方面很有趣,但是就错误而言,可以重新处理完整批次。
在第二个示例中,它也是批处理,但仅每个分区。这应该导致较小的批次,因此性能较少,但如果事情错误,则更少的后处理。
在上一个示例中,您选择提交每个消息。尽管这给出了最大的控制,但它会显着影响性能。此外,与其他情况一样,它不是完全错误的。
如果该应用程序在处理事件后崩溃,但在提交事件之前,重新启动最后一个事件后,可能会重新加入(即至少一次语义(。但至少只有一个事件应受到影响。
如果您需要一次语义,则需要使用交易生产者。