假设我有一个基本的KafkaStreams应用程序,它有一个主题(有多个分区)和一个处理器类型,处理消息如下:
builder.stream(topic)
.process(() -> new MyProcessor());
是否会出现以下情况?对于MyProcessor的特定实例,比如M(即通过调用处理器供应商获得的特定java对象),对于主题的特定分区,比如P,
- 在某个时间t1,M接收来自P的消息
- 在稍后的时间点t2,P从M撤销,因此M不再接收来自P的消息(例如,因为启动了一个额外的工作程序来处理P/em>)
- 稍后,t3,M再次接收来自P的消息
我查看了流任务如何与Kafka主题分区相关的文档,但没有找到关于这与处理器实例的构建和删除以及/或在重新平衡发生时将主题分区(取消)分配给现有处理器相关的详细信息。
在Kafka Streams中;处理单位";称为流任务。
任务可以是有状态的和/或无状态的。当重新平衡事件发生时,在应用程序的一个实例(例如M
)上运行的任务可能会移动到应用程序的另一个实例。
主题分区和流任务之间有一个1-1的映射,这保证了一个并且只有一个任务将处理来自特定分区的数据。例如,如果任务3负责从分区P
中读取和处理,那么当任务3从实例M
移动到另一个实例M'
时,则M
将停止读取P
(因为它不再运行任务3),而M'
(任务3现在运行的地方)将恢复/继续处理P
。
- 在某个时间t1,M从P接收消息
假设负责处理主题分区P
的流任务称为task(P)
。在时间t1,M
恰好是正在运行task(P)
的应用程序实例。这就是上面第1点的情况。
- 在稍后的t2点,P从M撤销,因此M不再接收来自P的消息(例如,因为启动了一个处理P的额外工作程序)
这里,应用程序的另一个实例(您将此实例称为"额外工作者")将负责运行task(P)
。这里,task(P)
将自动从原始应用程序实例M
迁移到新实例M'
。由task(P)
管理的任何状态(例如,当任务正在执行诸如联接或聚合之类的有状态操作时)当然都将与任务一起迁移。当task(P)
被迁移时,从主题分区P
读取和处理的责任也将从应用程序实例M
转移到M'
。
也许不要想太多";哪个应用实例正在处理主题分区P
"相反,特定的分区总是由特定的流任务处理,流任务可能会在应用程序实例之间移动。(当然,Kafka的Streams API将防止不必要的任务迁移,以确保应用程序的处理保持高效。)
- 在稍后的时间点t3,M再次从P接收消息
这意味着,在时间t3,由于另一个重新平衡事件,M
再次被分配了任务task(P)
——可能是因为其他应用程序实例M'
被取下,或者发生了其他需要任务迁移的事情。
在对这个答案的评论中被问到:尽管有一两句关于州迁移的话也很有用。这不像二进制/物理数据是从一个RocksDB实例中获取并传递给另一个实例的。显然,该状态是基于容错机制重建的。
有状态任务使用状态存储来存储持久状态信息。这些状态存储是容错的。这些状态存储的真相来源于Kafka本身:对状态存储的任何更改(例如,递增计数器)都以流式方式备份到Kafka中——类似于将数据库表的CDC流存储到Kaf卡主题中(这些是正常主题,但通常称为"更改日志主题")。然后,当一个任务死亡或迁移到另一个容器/VM/机器时,任务的状态存储将通过读取Kafka(想想:流式备份/流式恢复)恢复到任务的新容器/VM//机器中。这样可以将状态存储恢复到原始容器上的状态,而不会出现任何数据丢失或重复。
流任务使用RocksDB在本地实现状态存储(如任务的容器中),以达到优化目的。将这些本地RocksDB实例视为缓存,在数据安全方面可以丢失,因为状态数据的持久存储是Kafka,如上所述。