Apple M1-RocksDBException导致打开商店时出错:将KStream加入KTable时找不到列族



我正在尝试从两个流中退出Join事件。最初,我加入了2个KStream,一切都很顺利。然而,当我尝试将第二个流转换为KTable时,我会遇到一个错误。以下是第二个流转换为KTable的代码:

@Bean
public KafkaStreams kafkaStreams() throws IOException {
final Properties props = configureKafkaStreamsProperties();

ObjectMapper mapper = new ObjectMapper();
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
final StreamsBuilder builder = new StreamsBuilder();
// 1st Structured stream
KStream<String, String> firstStream = builder.stream("topic-1", Consumed.with(Serdes.String(), Serdes.String()));

KStream<String, String> firstStreamTransformed = firstStream.map((k, v) -> {
try {
InputModelOne model = mapper.readValue(v, InputModelOne.class);
return new KeyValue<>(model.getId(), v);
} catch (Exception e) {
logger.error(e.toString());
return new KeyValue<>(k, v);
}
});

// Second stream
KStream<String, String> secondStream = builder.stream("topic-2",
Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> secondStreamTransformed = secondStream.map((k, v) -> {
try {
InputModelTwo model = mapper.readValue(v, InputModelTwo.class);
return new KeyValue<>(model.getId(), v);
} catch (Exception e) {
logger.error(e.toString());
return new KeyValue<>(k, v);
}
});
// Build KTable from second topic
KTable<String, String> secondTable = secondStreamTransformed.toTable(Materialized.as("topic-2-table"));

// Valuejoiner
ValueJoiner<String, String, String> joiner = (one, two) -> {
try {

InputModelOne modelOne = mapper.readValue(one, InputModelOne.class);
InputModelTwo modelTwo = new InputModelTwo();
// Create output object with properties
OutputModel out = new OutputModel(modelOne.getId());
out.setOneTimestamp(modelOne.getTimestamp());
out.setTwoTimestamp(modelTwo.getTimestamp());
return mapper.writeValueAsString(out);
} catch (JsonProcessingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
}
};
KStream<String, String> joined = firstStreamTransformed leftJoin(secondTable,
joiner);

joined.to("joined-topics", Produced.with(Serdes.String(), Serdes.String()));

错误如下:

org.apache.kafka.streams.errors.ProcessorStateException: Error opening store joined-topics at location /var/folders/lx/dz_x9j5d7lz4mfymgzkcn7wr0000gn/T/kafka-streams/streams-pipe/2_0/rocksdb/joined-topics
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:87) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:186) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:254) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:55) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:75) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:55) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$1(MeteredKeyValueStore.java:122) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:122) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:201) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:103) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:216) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:433) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:849) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:731) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) ~[kafka-streams-2.8.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) ~[kafka-streams-2.8.0.jar:na]
Caused by: org.rocksdb.RocksDBException: Column family not found: keyValueWithTimestamp
at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-6.29.4.1.jar:na]
at org.rocksdb.RocksDB.open(RocksDB.java:306) ~[rocksdbjni-6.29.4.1.jar:na]
at org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:75) ~[kafka-streams-2.8.0.jar:na]
... 18 common frames omitted

我正在为Kafka和Zookeeper使用Docker,Kafka正在本地运行。如有任何帮助或建议,我们将不胜感激。我真的希望我能继续使用我的Mac,而不是切换到低质量的显示器。干杯,伙计们!

每日提醒更新所有依赖项并重新启动IDE

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<!-- <version>2.8.0</version> -->
<version>3.2.3</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<!-- <version>2.8.0</version> -->
<version>3.2.3</version>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<!-- <version>7.0.1</version> -->
<version>7.2.1</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-serializer</artifactId>
<!-- <version>7.0.1</version> -->
<version>7.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni -->
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<!-- <version>6.29.4.1</version> -->
<version>7.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.squareup.okhttp3/okhttp -->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.10.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.9.0</version>
</dependency>
</dependencies>

最新更新