如果我使用persistentKeyValueStore,Kafka Streams就会停止,但与inMemoryKey



我显然是kafka/kafka流的初学者。我只需要阅读几个主题中给定的消息,给定它们的id。虽然我们的实际拓扑结构相当复杂,但这个Stream应用程序只需要实现这个简单的目标

商店就是这样创建的:

final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.table(
topic,
Materialized.<String, String>as( persistentKeyValueStore(storeNameOf(topic)))
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withCachingDisabled());
//      Materialized.<String, String>as( inMemoryKeyValueStore(storeNameOf(topic)))
//                  .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
//                  .withCachingDisabled());
);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), new Properties() {{ /** config items go here**/ }})
kafkaStreams.start();
//logic for awaiting kafkaStreams to reach `RUNNING` state as well as InvalidStateStoreException handling (by retrying) is ommited for simplicity :   
ReadOnlyKeyValueStore<String, String> replyStore = kafkaStreams.store(storeNameOf(topicName), QueryableStoreTypes.keyValueStore());

因此,当使用注释的inMemoryKeyValueStore时,成功创建了replyStore,我可以在中查询值,而不会出现问题

使用persistentKeyValueStore,最后一行使用java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR失败。注意,在store调用之前,我确实检查了KafkaStreams是否处于状态RUNNING;ERROR状态是在调用中以某种方式达到的。

你认为我在建立持久存储时可能遗漏了什么吗?调试提示也会有很大帮助,我不得不承认,我很困在这里谢谢

编辑:执行发生在docker容器下。这是非常相关的,但我承诺添加首字母

正如Matthias J.Sax在评论表单中指出的那样,为了调试这个问题,uncaughtExceptionHandler注册起到了很大的帮助。

实际问题是由于RocksDB和我使用的docker映像之间的不兼容(因此从openjdk:8-jdk-alpine更改为anapsix/alpine-java:8(

相关:https://issues.apache.org/jira/browse/KAFKA-4988不满意的链接错误:/tmp/snappy1.1.4-libsnappyjava.so加载共享库ld-linux-x86-64.so.2时出错:没有这样的文件或目录

相关内容

  • 没有找到相关文章

最新更新