Processor.init() 在 Kafka Stream 中为单个任务多次调用



我使用了一个利用WALL_CLOCK_TIME标点符号的处理器,我注意到在重新平衡阶段之后,init()方法被多次调用用于同一任务。

init()记录此行:

log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);

在日志中,我可以看到它被调用了两次:

07:53:15 信息 - 在处理器中 init,taskId 为 1_0,可取消为 org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

07:53:15 信息 - 在处理器中 init,taskId 为 1_0,可取消为 org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7

此外,我记录了close()方法中发生的事情,我看到那里取消了Cancellable......

07:53:15 信息 - 关闭可取消 org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd

。从其身份哈希码(11a53ebd)来看,现有的处理器已被重复使用,但也创建了一个新的处理器。因此,我的定期任务被安排了两次,而不是一次。

我以为每个任务只有一个处理器。任何想法可能导致这种行为的原因,我如何防止它发生?

在重新平衡期间,所有Processor都将关闭,然后在重新平衡后再次初始化。这是确保不会丢失任何数据所必需的。

但是,您看到的哈希引用注册的标点符号,而不是Processor对象。因此,如果您cancel标点符号close并在init()schedule标点符号,则旧时间表将被新时间表替换。

相关内容

  • 没有找到相关文章

最新更新