我正在将KafkaStreams 0.10.2.1与Windowed RocksDB状态存储一起使用,在状态存储初始化过程中我看到了一个非常奇怪的行为。在每个任务的状态存储文件夹中,KafkaStreams正在创建和删除包含RocksDB文件的文件夹,持续30分钟。
如果状态存储名为XXX,那么我看到在名为的文件夹中创建文件夹
State Folder/Task ID/XXX
名称如
XXX-201710211345
包含RocksDB文件。创建这些文件夹,然后删除,并创建具有不同时间戳的新文件夹。这种情况持续30分钟,直到消息处理完成。我猜RocksDB正在从状态存储的更改日志主题中重建所有历史状态,但我不明白它的目的是什么,因为它最终删除了除最后一个之外的所有状态。
KafkaStreams创建和删除这些文件夹的原因是什么?
如何使KafkaStreams只重新创建最新状态?
这是我的拓扑结构的精简版本:
stream
.map((key, value) -> KeyValue.pair(key, value))
.through(Serdes.String(), serde, MY_TOPIC)
.groupByKey(Serdes.String(), serde)
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(windowDurationSec)).until(TimeUnit.SECONDS.toMillis(windowDurationSec) + TimeUnit.SECONDS.toMillis(lateEventGraceTimeSec)), "Hourly_Agg")
.foreach((k, v) -> System.out.println(""));
这是来自strace的一个(小部分)转储:
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst", {st_mode=S_IFREG|0644, st_size=3158, ...}) = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230/000006.sst") = 0
6552 unlink("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = -1 EISDIR (Is a directory)
6552 rmdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211230") = 0
6552 stat("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg", {st_mode=S_IFDIR|0755, st_size=4096, ...}) = 0
6552 mkdir("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500", 0755) = 0
6552 rename("/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG", "/path/Prod/kafka-streams/Counter-V13/3_131/Hourly_Agg/Hourly_Agg-201710211500/LOG.old.1508746634575191") = -1 ENOENT (No such file or directory)
Kafka Streams确实重新创建了最新的状态,您看到的行为是经过设计的。
对于窗口存储,窗口保留时间段被划分为所谓的段,Streams每个段使用一个RocksDB来存储相应的数据。这允许根据时间进度"滚动"分段,并有效删除早于保留时间的数据(即,丢弃孔分段/RocksDB)。
当状态被重新创建时,我们只需阅读整个变更日志主题,并将所有这些更新应用到存储中。因此,您可以看到与处理过程中相同的分段滚动行为(只是在更小的时间范围内)。由于没有足够的前期信息,"跳"到最后一个状态是不容易的——因此,盲目地重播变更日志是最好的选择。