String prefix = "B";
Properties properties = new Properties();
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-word-pattern");
StreamsBuilder streamsBuilder = new StreamsBuilder();
//source
KStream<String, String> stream = streamsBuilder.stream(SOURCE_TEST_TOPIC);
//word processor
KStream<String, String> wordProcessor = stream.flatMapValues(s -> Arrays.asList(s.split(",")));
//match
KStream<String, String> matchProcessor =
wordProcessor.filter((key, value) -> value.toUpperCase().startsWith(prefix));
matchProcessor.to(WORD_TOPIC);
Topology topology = streamsBuilder.build();
try (KafkaStreams kafkaStreams = new KafkaStreams(topology, properties)) {
System.err.println("Stream is starting...");
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.err.println("Stream is closing...");
kafkaStreams.close();
}));
}
当我运行此流时,引发了以下异常:
线程中的异常"主";java.lang.NoSuchFieldError:TRACE位于org.apache.kafka.streams.StreamsConfig.(StreamsConfig.java:766(网址:org.apache.kafka.streams.KafkaStreams。(KafkaStreams.java:693(在kafkacustom.streams.KafkaStreamsExample.main(KafkaStreamsExample.java:42(
如何修复
这是由于库不兼容导致的,请检查您的org.apache.kafka:kafka-clients
和org.apache.kafka:kafka-streams
版本。
在我的特定情况下,org.apache.kafka:kafka-clients:6.0.0-ccs
(汇流库(已经从RecordingLevel
传感器内部类中删除了这个TRACE
枚举值,而我使用的是开源版本的org.apache.kafka:kafka-streams:3.1.1
。所以我刚刚更新到融合版本6.0.0-css
并使用
为了进一步了解您的情况,请单击类StreamsConfig
的控制台日志跟踪,应该是第794行,并检查Sensor
类的内部类RecordingLevel
是否具有TRACE
枚举值
对于";官方的";repo,它仍然会重新生成这个枚举值