流处理器的close()init()方法:重新平衡发生时流线程的行为



我想知道Kafka streams的行为(我使用的是低级Java API(。我通过实现接口org.apache.kafka.streams.processor.Processor来实现Stream processor

org.apache.kafka.streams.processor.Processor具有以下方法:

void init(ProcessorContext上下文(;

void close((;

假设kafka主题有6个分区,并且在我的流应用程序中,它有num.stream.threads=1,并且SIX instances of streaming application在不同的机器中运行。因此,这意味着每个流线程将只分配一个分区。

假设其中一台机器崩溃,那么我们只剩下五台机器。现在,这将触发重新平衡,当它发生时,我有以下问题:

  • 重新平衡时StreamThread是否会消亡?由于StreamThread是一个线程,所以在重新平衡过程中,它们是保持"活动"状态,还是所有Stream线程都被"杀死"并重新创建?

  • 在创建StreamProcessor实例时调用init((/close((,还是在创建StreamThread时为每个重新平衡调用OR?基本上想知道这些方法在哪个阶段被调用,以及在创建处理器实例时或在创建流任务时重新平衡/创建流线程的关系。

  • 如何以编程方式使客户端离开组?然而,我确实对此进行了搜索,得到了不相关的结果。

重新平衡时StreamThread会死吗?由于StreamThread是一个线程,所以在重新平衡过程中,它们是保持"活动"状态,还是所有Stream线程都被"杀死"并重新创建?

不,线程保持活动状态。(只有崩溃的机器上的线程显然是死的。(

在创建StreamProcessor实例时调用init((/close((,还是在创建StreamThread时为每个重新平衡调用OR?基本上想知道这些方法在哪个阶段被调用,以及在创建处理器实例时或在创建流任务时重新平衡/创建流线程的关系。

这取决于版本。在旧版本(2.3.x或更早版本(中,在重新平衡期间,所有任务都将暂停(即暂停(,因此将调用close()。如果恢复(或迁移并重新创建(现有任务,则调用init()。因此,基本上,当StreamThread启动时,它首先触发重新平衡,在分配分区后,创建任务并进行相应的init()调用。对于现有的StreamThreads,当触发重新平衡时,所有任务都将挂起(即调用close()(,并重新分配以及重新启动新任务。

在较新的版本(2.4.x及更新版本(中,会进行增量再平衡,因此在再平衡过程中任务不再暂停。只有当任务从一个StreamThread迁移到另一个时,该任务才会在一个线程上关闭,并在新线程上重新初始化。

如何以编程方式使客户端离开组?然而,我确实对此进行了搜索,得到了不相关的结果。

不确定你的确切意思。但是,您可以调用KafkaStreams#close()来停止其所有本地StreamThreads,因此这些线程最终会离开组。

最新更新