尝试运行简单的Kafka Stream应用程序时收到异常



我一直在尝试使用Kafka运行一个简单的wordcount应用程序,但每当我运行它时,我都会收到以下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/common/utils/LogContext
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:630)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:610)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:557)
at StreamsApp.main(StreamsApp.java:49)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.utils.LogContext
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:583)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)

我不知道为什么我总是犯这个错误。。。下面列出了主要方法的代码。(第49行(KafkaStreams流=新的KafkaStream(拓扑、道具(;

public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("inputTopic");
final Pattern pattern = Pattern.compile("\W+", Pattern.UNICODE_CHARACTER_CLASS);
KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
wordCounts
.foreach((w, c) -> System.out.println("word: " + w + " -> " + c));

String outputTopic = "outputTopic";
Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();
wordCounts.to(stringSerde, longSerde, outputTopic);
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Thread.sleep(30000);
streams.close();
}

}

依赖项org.apache.kafka:kafka-streamsorg.apache.kafka:kafka-clients之间存在冲突。根据您的例外情况,您使用的kafka-clients版本低于1.0.0,但kafka-streams版本等于或高于1.0.0。

请确保kafka-clients版本至少为1.0.0(因此您需要升级kafka-clients版本(,否则您需要降级kafka-streams版本。

奇怪的是为什么kafka流需要单独使用。我以为它包含在ksqldb中。然而,以下是为我解决问题的方法。如果这不正确,请大声呼喊!

<dependency>
<groupId>io.confluent.ksql</groupId>
<artifactId>ksqldb-api-client</artifactId>
<version>0.27.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.2.1</version>
</dependency>

所需的jar应该在类路径中。希望下面能帮助你。

C:PROJECTENVIRONMENTKAFKA_2_11binwindowskafka-run-class.bat -cp  C:PROJECTKafkaDemostargetclasses;C:PROJECTENVIRONMENTKAFKA_2_11libskafka_2.11-2.4.0.jar;C:PROJECTENVIRONMENTKAFKA_2_11libskafka-clients-2.4.0.jar;C:PROJECTE
NVIRONMENTKAFKA_2_11libskafka-streams-2.4.0.jar;C:PROJECTENVIRONMENTKAFKA_2_11libskafka-clients-2.4.0.jar;C:PROJECTENVIRONMENTKAFKA_2_11libsslf4j-api-1.7.28.jar;C:PROJECTENVIRONMENTExtraJarsapache-logging-log4j.jar;C:PROJECTENVIRONMENTExtraJars
rocksdbjni-6.5.3.jar; demo.wordcount.WordCount

相关内容

  • 没有找到相关文章

最新更新