使用kafka溪流开发时,在Lib Rocks DB上不满意的Linkerror



我正在我的开发Windows机器上编写一个Kafka Streams应用程序。如果我尝试使用KAFKA流的leftJoinbranch功能,则在执行JAR应用程序时会在下面遇到错误:

Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: C:UsersuserAppDataLocalTemplibrocksdbjni325337723194862275.dll: Can't find dependent libraries
    at java.lang.ClassLoader$NativeLibrary.load(Native Method)
    at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1941)
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1824)
    at java.lang.Runtime.load0(Runtime.java:809)
    at java.lang.System.load(System.java:1086)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
    at org.rocksdb.Options.<clinit>(Options.java:22)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
    at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:38)
    at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:75)
    at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:72)
    at org.apache.kafka.streams.state.internals.ChangeLoggingSegmentedBytesStore.put(ChangeLoggingSegmentedBytesStore.java:54)
    at org.apache.kafka.streams.state.internals.MeteredSegmentedBytesStore.put(MeteredSegmentedBytesStore.java:101)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:109)
    at org.apache.kafka.streams.state.internals.RocksDBWindowStore.put(RocksDBWindowStore.java:101)
    at org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:65)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:43)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:48)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:83)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:70)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:641)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)

似乎Kafka找不到DLL,但是等等...我正在开发Java应用程序!

有什么问题?为什么如果我尝试执行更简单的流操作(例如filter(?

,为什么此错误不会炫耀

更新:

仅当经纪人中存在消息时,此问题才会引起。我正在使用Kafka Streams版本0.10.2.1。

这是提出问题的代码

public class KafkaStreamsMainClass {
    private KafkaStreamsMainClass() {
    }
    public static void main(final String[] args) throws Exception {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:9092");
        streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "schema-registry:8082");
        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
        KStreamBuilder builder = new KStreamBuilder();
        KStream<GenericRecord, GenericRecord> sourceStream = builder.stream(SOURCE_TOPIC);
        KStream<GenericRecord, GenericRecord> finishedFiltered = sourceStream
                .filter((GenericRecord key, GenericRecord value) -> value.get("endTime") != null);
        KStream<GenericRecord, GenericRecord>[] branchedStreams = sourceStream
                .filter((GenericRecord key, GenericRecord value) -> value.get("endTime") == null)
                .branch((GenericRecord key, GenericRecord value) -> value.get("firstField") != null,
                        (GenericRecord key, GenericRecord value) -> value.get("secondField") != null);
        branchedStreams[0] = finishedFiltered.join(branchedStreams[0],
                (GenericRecord value1, GenericRecord value2) -> {
                    return value1;
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
        branchedStreams[1] = finishedFiltered.join(branchedStreams[1],
                (GenericRecord value1, GenericRecord value2) -> {
                    return value1;
                }, JoinWindows.of(TimeUnit.SECONDS.toMillis(2)));
        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
        streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
            throwable.printStackTrace();
        });
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

我打开了由Maven下载的rocksdbjni-5.0.1.jar存档,其中包括librocksdbjni-win64.dll库。看来它试图从岩石的外部检索库,而是从内部检索。

我正在在Windows 7机器上开发。

您曾经遇到过这个问题吗?

最近我也解决了这个问题。我设法通过两个步骤解决了这个问题:

  1. C:Users[your_user]AppDataLocalTemp文件夹中删除所有librocksdbjni[...].dll文件。
  2. 在您的项目中添加rocksdb的Maven依赖性,这对我有用:https://mvnrepository.com/artifact/org.rock.rocksdb/rocksdbjni/5.0.1

编译您的Kafka流应用程序并运行它。它应该起作用!

我将我的kafka-streams项目更新为最新发布的版本1.0.0。

此版本遇到了此错误,但是将其修补并在内部文物服务器上上传此修补版本后,我们能够在Windows和Linux上执行我们的Kafka-streams代理。接下来的版本1.0.1和1.1.0将具有此错误修复,因此,一旦发布其中一个版本,我们将切换到它们而不是修补版本。

总结一下Kafka家伙用1.0.0版本解决了这个错误。

我的问题是/tmp/目录(CentOS(

中的权限

rockdb使用

java.io.tmpdir 

系统属性在内部决定放置librocksdbjni文件,通常类似于此 /tmp/librocksdbjni2925599838907625983.so

通过在kafka-streams应用程序中设置不同的tempdir属性来解决。

System.setProperty("java.io.tmpdir", "/opt/kafka-streams/tmp");

您缺少RockSDB DLL所取决于的一些本机库。请参阅https://github.com/facebook/rocksdb/issues/1302

我在使用JDK 1.8时遇到了相同的问题。当我将其更改为JRE时,它得到了解决。

在Mac中面临类似的问题。根据此链接,https://github.com/facebook/rocksdb/issues/5064问题与我版本的Mac OS(10.11.6(中安装的旧LIBC有关。

相关内容

  • 没有找到相关文章

最新更新