我最近创建了我的第一个Kafka流学习应用程序。我使用了spring-cloud-stream-kafka-binding。这是一个简单的电子商务系统,我正在阅读一个名为"产品"的主题,每当有新库存的产品进入时,它就会包含所有的产品条目。我要把数量加起来,得到一种产品的总数量。
我有两个选择
- 发送聚合详细信息(KTable)到另一个kafka主题aggregating -products
- 具体化聚合数据
我选择了第二个选项,我发现应用程序自己创建了一个kafka主题,当我从该主题中消费消息时,然后得到聚合的消息。
.peek((k,v) -> LOGGER.info("Received product with key [{}] and value [{}]",k, v))
.groupByKey()
.aggregate(Product::new,
(key, value, aggregate) -> aggregate.process(value),
Materialized.<String, Product, KeyValueStore<Bytes, byte[]>>as(PRODUCT_AGGREGATE_STATE_STORE).withValueSerde(productEventSerde)//.withKeySerde(keySerde)
// because keySerde is configured in application.properties
);
使用<<p> strong> InteractiveQueryService ,我可以在我的应用程序中访问这个状态存储,以查找产品的可用总量。
现在有几个问题-
- 为什么应用程序创建了一个新的kafka主题?
- 如果答案是"存储聚合数据",那么这与选项1有什么不同,我可以自己发送聚合数据?
- RocksDB在哪里出现?
我的应用程序的代码(它比我在这里解释的更多)可以从这个链接访问-
https://github.com/prashantbhardwaj/kafka-stream-example/blob/master/src/main/java/com/appcloid/kafka/stream/example/config/SpringStreamBinderTopologyBuilderConfig.java
内部主题称为changelog主题,用于容错。聚合状态使用RocksDB
本地存储在磁盘上,也以变更日志主题的形式存储在Kafka代理上——这本质上是一个"备份"。如果任务被转移到新机器上,或者本地状态由于其他原因丢失,Kafka Streams可以通过从changelog主题中读取对原始状态的所有更改并将其应用到新的RocksDB实例中来恢复本地状态。恢复完成后(处理了整个changelog主题),新机器上应该是相同的状态,新机器可以在旧机器停止的地方继续处理。这有很多复杂的细节(例如,在默认设置中,当失败发生时,可能会发生相同输入记录的状态更新两次)。
参见https://developer.confluent.io/learn-kafka/kafka-streams/stateful-fault-tolerance/