我显然是kafka/kafka流的初学者。我只需要阅读几个主题中给定的消息,给定它们的id。虽然我们的实际拓扑结构相当复杂,但这个Stream应用程序只需要实现这个简单的目标
商店就是这样创建的:
final StreamsBuilder streamsBuilder = new StreamsBuilder();
streamsBuilder.table(
topic,
Materialized.<String, String>as( persistentKeyValueStore(storeNameOf(topic)))
.withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
.withCachingDisabled());
// Materialized.<String, String>as( inMemoryKeyValueStore(storeNameOf(topic)))
// .withKeySerde(Serdes.String()).withValueSerde(Serdes.String())
// .withCachingDisabled());
);
KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), new Properties() {{ /** config items go here**/ }})
kafkaStreams.start();
//logic for awaiting kafkaStreams to reach `RUNNING` state as well as InvalidStateStoreException handling (by retrying) is ommited for simplicity :
ReadOnlyKeyValueStore<String, String> replyStore = kafkaStreams.store(storeNameOf(topicName), QueryableStoreTypes.keyValueStore());
因此,当使用注释的inMemoryKeyValueStore
时,成功创建了replyStore
,我可以在中查询值,而不会出现问题
使用persistentKeyValueStore
,最后一行使用java.lang.IllegalStateException: KafkaStreams is not running. State is ERROR
失败。注意,在store
调用之前,我确实检查了KafkaStreams是否处于状态RUNNING
;ERROR状态是在调用中以某种方式达到的。
你认为我在建立持久存储时可能遗漏了什么吗?调试提示也会有很大帮助,我不得不承认,我很困在这里谢谢
编辑:执行发生在docker容器下。这是非常相关的,但我承诺添加首字母
正如Matthias J.Sax在评论表单中指出的那样,为了调试这个问题,uncaughtExceptionHandler注册起到了很大的帮助。
实际问题是由于RocksDB和我使用的docker映像之间的不兼容(因此从openjdk:8-jdk-alpine
更改为anapsix/alpine-java:8
(
相关:https://issues.apache.org/jira/browse/KAFKA-4988不满意的链接错误:/tmp/snappy1.1.4-libsnappyjava.so加载共享库ld-linux-x86-64.so.2时出错:没有这样的文件或目录