Kafka 流:处理器有时会在应用程序重新启动时处理相同的消息



在我的Java应用程序中,我有一个Kafka处理器。

我的处理方法如下所示:

@Override
public void process(String key, String value) {
    System.out.println("In the process method, the offset is: " + context.offset());
    //Some more code
}

其中上下文是来自 init 方法的 ProcessorContext。

我启动应用程序并记录:

In the process method, the offset is: 1203
In the process method, the offset is: 1204

然后我再次启动应用程序,并收到相同的消息。在应用程序重新启动几次后(或一段时间后,我找不到模式(,进程方法将停止调用,并且我不再在应用程序启动时收到这些消息。

知道为什么这些消息有时会被多次处理吗?

我的流配置具有以下属性:

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "someId");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10);
props.put(StreamsConfig.STATE_DIR_CONFIG, "somedir");

编辑

下面的代码片段显示了我如何创建KafkaStreams:

public class KafkaStreamsProcessorBean implements SmartLifecycle {
    @Override
    public synchronized void start() {
        final KStreamBuilder builder = new KStreamBuilder();
        final KStream<String, String> debeziumStream = builder.stream("debezium.topic");
        debeziumStream.process(() -> debeziumProcessor);
        kafkaStreams = new KafkaStreams(builder, streamsConfig);
        kafkaStreams.start();
    }
}

这里 streamsConfig 是具有我所展示的属性的配置,debeziumProcessor 是第一个代码片段中的 Kafka 处理器。

默认情况下,

Kafka Streams 处理保证至少为一次。这意味着可以重新处理消息。

在您的情况下,即使您将StreamsConfig.PROCESSING_GUARANTEE_CONFIG设置为 StreamsConfig.EXACTLY_ONCE,您也可以在重新启动后看到相同的日志(具有相同的偏移量信息(。

处理保证是关于在一个事务中将偏移量结果写入主题。这并不意味着消息不能多次处理(使用相同的键和值多次调用处理器::p rocess(...(。

可能出现以下情况:

  • 消息已读。
  • Processor::process(...)被召唤了。
  • 应用程序已完成,但未写入偏移量。
  • 重新启动后,应用程序将读取相同的消息,并将调用相同键和值的Processor::process(...)

相关内容

  • 没有找到相关文章

最新更新