我想知道Kafka streams
的行为(我使用的是低级Java API(。我通过实现接口org.apache.kafka.streams.processor.Processor
来实现Stream processor
。
org.apache.kafka.streams.processor.Processor
具有以下方法:
void init(ProcessorContext上下文(;
void close((;
假设kafka主题有6个分区,并且在我的流应用程序中,它有num.stream.threads=1
,并且SIX instances of streaming application
在不同的机器中运行。因此,这意味着每个流线程将只分配一个分区。
假设其中一台机器崩溃,那么我们只剩下五台机器。现在,这将触发重新平衡,当它发生时,我有以下问题:
-
重新平衡时StreamThread是否会消亡?由于StreamThread是一个线程,所以在重新平衡过程中,它们是保持"活动"状态,还是所有Stream线程都被"杀死"并重新创建?
-
在创建StreamProcessor实例时调用init((/close((,还是在创建StreamThread时为每个重新平衡调用OR?基本上想知道这些方法在哪个阶段被调用,以及在创建处理器实例时或在创建流任务时重新平衡/创建流线程的关系。
-
如何以编程方式使客户端离开组?然而,我确实对此进行了搜索,得到了不相关的结果。
重新平衡时StreamThread会死吗?由于StreamThread是一个线程,所以在重新平衡过程中,它们是保持"活动"状态,还是所有Stream线程都被"杀死"并重新创建?
不,线程保持活动状态。(只有崩溃的机器上的线程显然是死的。(
在创建StreamProcessor实例时调用init((/close((,还是在创建StreamThread时为每个重新平衡调用OR?基本上想知道这些方法在哪个阶段被调用,以及在创建处理器实例时或在创建流任务时重新平衡/创建流线程的关系。
这取决于版本。在旧版本(2.3.x或更早版本(中,在重新平衡期间,所有任务都将暂停(即暂停(,因此将调用close()
。如果恢复(或迁移并重新创建(现有任务,则调用init()
。因此,基本上,当StreamThread
启动时,它首先触发重新平衡,在分配分区后,创建任务并进行相应的init()
调用。对于现有的StreamThreads
,当触发重新平衡时,所有任务都将挂起(即调用close()
(,并重新分配以及重新启动新任务。
在较新的版本(2.4.x及更新版本(中,会进行增量再平衡,因此在再平衡过程中任务不再暂停。只有当任务从一个StreamThread
迁移到另一个时,该任务才会在一个线程上关闭,并在新线程上重新初始化。
如何以编程方式使客户端离开组?然而,我确实对此进行了搜索,得到了不相关的结果。
不确定你的确切意思。但是,您可以调用KafkaStreams#close()
来停止其所有本地StreamThreads
,因此这些线程最终会离开组。