我创建了一个Kafka主题并向它发送了一些消息。
我创建了具有流拓扑的应用程序。基本上,它是在做总和,并将其具体化到当地的一家州立商店
我看到新目录是在我配置的状态文件夹中创建的
我可以从当地的州立商店中读取金额
到目前为止一切都很好
然后,我关闭了正在运行流的应用程序
我删除了在状态文件夹中创建的目录
我重新启动Kafka集群
我重新启动具有流拓扑结构的应用程序
据我所知,这个州已经不复存在了。卡夫卡需要再次进行聚合。但事实并非如此。我仍然能够得到之前的总和结果。怎么回事?卡夫卡在哪里拯救了当地的国营商店?
这是我的代码
Reducer<Double> reduceFunction = (subtotal, amount) -> {
// detect when the reducer is triggered
System.out.println("reducer is running to add subtotal with amount..." + amount);
return subtotal + amount;
};
groupedByAccount.reduce(reduceFunction,
Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as(BALANCE).withValueSerde(Serdes.Double()));
我在reduceFunction中显式地放入了System.out。每当它被执行时,我都会在控制台上看到它
但在重新启动kafka集群和我的应用程序后,我没有看到任何
卡夫卡真的恢复了状态吗?或者它在其他地方拯救了国家?
如果我没有错的话,根据Ben Stopford的《设计事件驱动系统》(免费书籍(第137页,它指出:
我们可以存储,这些状态存储中的统计数据,它们将保存在本地并备份到卡夫卡,使用所谓的变更日志主题,继承了卡夫卡的所有耐久性担保。
您的状态存储的副本似乎也备份在Kafka中(即更改日志主题(。我不认为重新启动集群会清空(或删除(主题中已经存在的消息,因为它们在Zookeeper中被跟踪。因此,一旦重新启动集群和应用程序,本地状态存储就会从Kafka中恢复。