就像我在标题中说的那样,当生产者停止发送月经时,我想接收最后一条windowedBy消息。目前我是手动完成的,但首先是一个小的描述。
我有一个Kafka生产者,他正在从一个文件中读取行(每一行都是不同的jSon(,每个读取的行都发送给Kafka,时间段相差500毫秒。我只有120行(或jSons(。
我有一个消费者,他消费了生产者发送的所有jSons。代码:
final KStream<String, Aggregate> transactions = builder.stream(kafkaProperties.getTopic(), Consumed.with(Serdes.String(), aggregateSerde));
// Topology
transactions
.groupBy(this::groupedByTimeStampAndProtocolName)
.windowedBy( TimeWindows
.of( Duration.ofSeconds( 10 ))
.grace( Duration.ofMillis( 0 )))
.aggregate(
tool::emptyAggregate,
this::processNewRecord, //new TransactionAggregator(),
Materialized.<String, Aggregate, WindowStore<Bytes, byte[]>>as(TRANSACTION_AGGREGATE)
.withKeySerde(Serdes.String())
.withValueSerde(aggregateSerde)
)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.foreach(sendAggregatesToCassandra);
我有预期的功能,我的意思是,它接收所有记录,但要接收最后一条窗口消息,我必须手动发送记录。
关于此的两个问题:
- 有没有办法自动处理最后一个窗口?当制作人发送最后一张唱片(第120张jSon(时,制作人将不再发送更多唱片。我是该等时间还是其他什么都没关系
- 我看到我必须发送3条记录来处理最后一个窗口。我不清楚为什么我必须发送3条记录(如果我发送<3条记录,最后一个窗口没有完全消耗掉(。有没有办法只发送一条记录?是否更改缓冲区?更改某些属性
我在JDK 11中使用Kafka Streams(带spring(,我使用的是码头化的Kafka:
- confluentinc/cp卡夫卡:5.5.1
- 动物园管理员:3.4.14
- 卡夫卡:
<version.kafka>2.5.0</version.kafka>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${version.kafka}</version>
</dependency>
使用的Kafka属性包括:
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, kafkaProperties.getAppId()+Constants.APP_ID);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
在生产商方面:
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 127.0.0.1:9092);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.ACKS_CONFIG, "all");
求你了,你能帮我吗?
当您使用suppress()
(带有untilWindowCloses
配置(时,运算符只会在";流时间";预付款"流时间";是作为记录时间戳的函数计算的,因此,如果没有处理任何记录,";流时间";并且CCD_ 3将永远不会发射任何东西。因此,发送更多的记录是如何";流时间";可以提前。
注意:对于流式传输用例,假设数据永远不会停止,因此这对实际部署来说不是问题——像你这样从文件中读取并不是真正的流处理用例:我假设你从文件中进行读取是为了测试,在这种情况下,你的输入文件应该包含更多的记录来相应地提前流时间。
有关更多详细信息,请查看此博客文章:https://www.confluent.io/blog/kafka-streams-take-on-watermarks-and-triggers/
我还在卡夫卡峰会上做了一次关于这个话题的演讲:https://www.confluent.io/resources/kafka-summit-2020/the-flux-capacitor-of-kafka-streams-and-ksqldb/