我想删除我的GlobalKTable<Integer, Long>
存储的状态。
我试图通过以下方式删除状态:
- 通过调用
kafka-topics.sh --zookeeper localhost:8080 --delete --topic my-topic
删除整个 Kafka 主题 - 在应用程序启动时运行
KafkaStreams.cleanUp()
- 向我的流生成"Test:null"消息,因为空值应被视为存储中的 DELETE 语句,如此处所述。但是,我的流应用程序失败,因为无法将空值反序列化为 LONG。
请参阅以下例外:
org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
如何删除我的 GlobalKTable 的状态?
我找到了解决这个问题的方法。要删除 GlobalKTable 的整个状态,还需要清除 RocksDB 缓存文件。
该RocksDB文件存储在位置StreamsConfig.STATE_DIR_CONFIG
。我已经删除了文件,现在我的状态已完全清理。
不过,可能有更好的解决方案可以做到这一点?
KafkaStreams.cleanUp()
实际上应该删除这些RocksDB文件。这是一个在即将发布的 1.1 版本中修复的错误:issues.apache.org/jira/browse/KAFKA-6259。