如何删除 GlobalKTable 存储的状态



我想删除我的GlobalKTable<Integer, Long>存储的状态。

我试图通过以下方式删除状态:

  • 通过调用 kafka-topics.sh --zookeeper localhost:8080 --delete --topic my-topic 删除整个 Kafka 主题
  • 在应用程序启动时运行KafkaStreams.cleanUp()
  • 向我的流生成"Test:null"消息,因为空值应被视为存储中的 DELETE 语句,如此处所述。但是,我的流应用程序失败,因为无法将空值反序列化为 LONG。

请参阅以下例外:

org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
        at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
        at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
        at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
        at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:546)
        at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:920)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:821)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8

如何删除我的 GlobalKTable 的状态?

我找到了解决这个问题的方法。要删除 GlobalKTable 的整个状态,还需要清除 RocksDB 缓存文件。

该RocksDB文件存储在位置StreamsConfig.STATE_DIR_CONFIG。我已经删除了文件,现在我的状态已完全清理。

不过,可能有更好的解决方案可以做到这一点?

KafkaStreams.cleanUp()实际上应该删除这些RocksDB文件。这是一个在即将发布的 1.1 版本中修复的错误:issues.apache.org/jira/browse/KAFKA-6259。

相关内容

  • 没有找到相关文章

最新更新