KAFKA如何使用主题/分区/偏移来确切地实现一致消息传递逻辑



我设法在用@kafkalistener注释的方法中获取主题/分区/偏移,但是我如何使用这些数据实现精确的消费者逻辑?

我使用的是ConcurrentKalistEnerContainerFactory使用Concurrenc = 4的设置,并将AckMode设置为手动。我目前的方法是使用Redis进行reful:我使用主题:分区为redis键,偏移为其值,然后将偏移量与redis中的值进行比较,如果偏移量(大于Redis更大(,然后继续使用业务逻辑,否则我会忽略该消息。最后提交偏移(ack.acknowledge(((

但是,例如,例如,如果重新平衡发生在ack.acknowledge((完成之前,则会出现此错误:org.apache.kafka.clients.consumer.consumer.commitfailedexception,

重新平衡后,将原始分区分配给另一个线程,这会导致相同的消息消耗两次。

因此,一个单词,如何设计一个可以使每个kafka消息交付的逻辑?

您必须在kafka之外写下原子处理的最后一个偏移以及处理的结果。这可以是数据库或文件,只是不做两个写作,使其成为数据和偏移的单个原子写入。如果您的消费者崩溃和IT或其他实例重新启动或接管,则需要确保首先读取最后一个处理结果的偏移量,然后在poll((之前先搜索((到该位置以获取更多消息。这就是现有的Kafka水槽连接器今天可以实现EOS消耗的多少。

kafka尚不完全支持一次。它将在0.11.0.0发行版中提供:https://issues.apache.org/jira/browse/kafka-4923此版本计划于2017年6月14日,因此您可以自己等待或构建复杂的逻辑; - (

最新更新