了解kafka流中的max.task.idle.ms,以使用KStream-Ktable Join



我需要帮助理解kafka流的行为当max.task.idle.ms在kafka 2.2中使用。

我有一个kStream-ktable加入,其中kstream被重新钥匙:

KStream stream1 = builder.stream("topic1", Consumed.with(myTimeExtractor));
KStream stream2 = builder.stream("topic2", Consumed.with(myTimeExtractor));
KTable table = stream1
       .groupByKey()
       .aggregate(myInitializer, myAggregator, Materialized.as("myStore"))
stream2.selectKey((k,v)->v)
       .through("rekeyedTopic")
       .join(table, myValueJoiner)
       .to("enrichedTopic");

所有主题都有10个分区,用于测试,我将max.task.idle.ms设置为2分钟。MyTimeExtractor仅在标记为"快照"时才更新消息的事件时间:Stream1中的每个快照消息将其事件时间设置为某个常数t,stream2中的消息将其事件时间设置为t 1。

主题1中的每个消息和主题2中都有200条消息。我可以看到,在一秒钟左右的时间内,神秘店和重新介绍都被填补了。由于表中消息的事件时间低于流中消息的事件时间,我的理解(从阅读https://cwiki.apache.org/confluence/display/display/kafka/kip-353: kafka streams timestamp 同步(是,在填充了梅斯托尔和重新介绍后不久,我应该看到联接的结果(富集(。实际上,我应该能够首先填充重新介绍,并且只要在此之后不到2分钟的情况下,连接量仍应产生预期的结果。

这不是发生的事情。发生的事情是,神秘和重新介绍在第一秒钟左右的时间内填满了,然后两分钟都没有发生,只有那时,富集的电视才充满了预期的消息。

我不明白为什么富集的时间很久以前就已经"准备好"了,为什么要停下2分钟。我缺少什么?

基于其说明的文档:

max.task.idle.ms-最大的时间量时流任务时会保持空闲 其所有分区缓冲区都包含记录,以避免潜在的排序 跨多个输入流的记录处理。

我会说这可能是由于某些没有包含记录的分区缓冲区,因此基本上等待避免订单处理直至您已配置为属性的定义时间。

相关内容

  • 没有找到相关文章

最新更新