OSGI kafka 流应用程序抛出 LogAndFailExceptionHandler



我无法在Apache Felix框架中运行我简单的Kafka流应用程序,将其作为普通jar运行可以正常工作。它引发以下异常:

ERROR: bundle com.openet.odf.streamer-simple:1.0.0.SNAPSHOT (149)[com.openet.streamer.impl.streamerImpl(0)] : The activate method has thrown an exception                                                                [0/609]
java.lang.ExceptionInInitializerError
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:544)
at com.openet.streamer.impl.streamerImpl.activate(streamerImpl.java:122)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: org.apache.kafka.common.config.ConfigException: Invalid value org.apache.kafka.streams.errors.LogAndFailExceptionHandler for configuration default.deserialization.exception.handler: Class org.apache.kafka.streams.
errors.LogAndFailExceptionHandler could not be found.

我的代码:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.0.150.12:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder(); 
builder.stream("test").to("streams-pipe-output"); 
Topology topology = builder.build();         
KafkaStreams streams = new KafkaStreams(topology, props); // throws error here

将Java 8与Apache Felix Framework 6.0.3一起使用

重现步骤:

  1. 将 org.apache.servicemix.bundles.kafka-clients-2.3.1_1.jar复制到捆绑文件夹
  2. 将 org.apache.servicemix.bundles.kafka-streams-2.3.1_1.jar 复制到捆绑文件夹
  3. 将应用程序 jar 复制到捆绑文件夹
  4. Java -jar bin/felix.jar

感谢任何帮助/指示。

我设法通过执行以下操作解决了我的问题,这要归功于一位比我更熟悉 OSGi 的同事 =(

更多细节在这里: OSGi 类加载

KafkaStreams streams = null;
ClassLoader currentCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(LogAndFailExceptionHandler.class.getClassLoader());
streams = new KafkaStreams(builder.build(), props);
streams.start(); 
}
catch (Exception e) {
System.out.println(e.getMessage());
}
finally {
Thread.currentThread().setContextClassLoader(currentCL);
}

好的,遇到了另一个问题: 此解决方案仅在执行微不足道的操作时有效。当做更有意义的事情,如计数等时,我会收到与 rocksdb 相关的错误。

Caused by: java.lang.NoClassDefFoundError: org/rocksdb/Options
at org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier.get(RocksDbKeyValueBytesStoreSupplier.java:41)
at org.apache.kafka.streams.state.internals.RocksDbKeyValueBytesStoreSupplier.get(RocksDbKeyValueBytesStoreSupplier.java:23)
at org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder.build(TimestampedKeyValueStoreBuilder.java:55)
at org.apache.kafka.streams.state.internals.TimestampedKeyValueStoreBuilder.build(TimestampedKeyValueStoreBuilder.java:35)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder$StateStoreFactory.build(InternalTopologyBuilder.java:135)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.buildProcessorNode(InternalTopologyBuilder.java:953)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:856)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:809)
at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.build(InternalTopologyBuilder.java:792)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:671)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:634)
at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:544)
at com.openet.streamer.impl.streamerImpl.activate(streamerImpl.java:69)
... 81 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.rocksdb.Options not found by org.apache.servicemix.bundles.kafka-streams [150]
at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1639)
at org.apache.felix.framework.BundleWiringImpl.access$200(BundleWiringImpl.java:80)
at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:2053)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

我认为它与包着色有关。尝试显式设置配置(即default.deserialization.exception.handler(以在分片之前覆盖指向原始包名称(即org.apache.kafka.streams.errors.LogAndContinueExceptionHandler(的默认值。

我希望您还需要覆盖其他可能遇到相同问题的默认配置。查看文档,了解有关配置及其默认值的更多详细信息:https://docs.confluent.io/current/streams/developer-guide/config-streams.html

相关内容

  • 没有找到相关文章

最新更新