我使用了一个利用WALL_CLOCK_TIME
标点符号的处理器,我注意到在重新平衡阶段之后,init()
方法被多次调用用于同一任务。
我init()
记录此行:
log.info("In processor init, taskId is {}, cancellable is {}", context.taskId(), statisticsSending);
在日志中,我可以看到它被调用了两次:
07:53:15 信息 - 在处理器中 init,taskId 为 1_0,可取消为 org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
07:53:15 信息 - 在处理器中 init,taskId 为 1_0,可取消为 org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@7770d7b7
此外,我记录了close()
方法中发生的事情,我看到那里取消了Cancellable
......
07:53:15 信息 - 关闭可取消 org.apache.kafka.streams.processor.internals.PunctuationSchedule$RepointableCancellable@11a53ebd
。从其身份哈希码(11a53ebd)来看,现有的处理器已被重复使用,但也创建了一个新的处理器。因此,我的定期任务被安排了两次,而不是一次。
我以为每个任务只有一个处理器。任何想法可能导致这种行为的原因,我如何防止它发生?
在重新平衡期间,所有Processor
都将关闭,然后在重新平衡后再次初始化。这是确保不会丢失任何数据所必需的。
但是,您看到的哈希引用注册的标点符号,而不是Processor
对象。因此,如果您cancel
标点符号close
并在init()
中schedule
标点符号,则旧时间表将被新时间表替换。