Update
为了提供一些上下文 - Kafka 主题通过 LZ4 压缩启用。
识别压缩类型后,CompressionType
枚举会尝试将LZ4
与运行时由于某种原因可能找到的KafkaLZ4BlockInputStream
相关联。
@Override
public InputStream wrapForInput(ByteBuffer inputBuffer, byte messageVersion, BufferSupplier decompressionBufferSupplier) {
try {
return new KafkaLZ4BlockInputStream(inputBuffer, decompressionBufferSupplier,
messageVersion == RecordBatch.MAGIC_VALUE_V0);
} catch (Throwable e) {
throw new KafkaException(e);
}
}
原始帖子
我有一个 JAR 包含一个 Flink 作业来消费来自 Kafka 的记录。
将 JAR 文件提交到 Flink 后,任务管理器说缺少KafkaLZ4BlockInputStream
类:
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.common.KafkaException: Received exception when fetching the next record from flight-cache-3. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1553)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1374)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1313)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.fetch(KafkaPartitionSplitReader.java:113)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more
Caused by: org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.record.KafkaLZ4BlockInputStream
at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:113)
at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:261)
at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:346)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.nextFetchedRecord(Fetcher.java:1496)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1533)
... 15 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.kafka.common.record.KafkaLZ4BlockInputStream
at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
... 19 more
我将JAR文件解压缩到一个文件夹中,发现该类确实存在:
$ ls -l org/apache/kafka/common/record/KafkaLZ4BlockInputStream.class
-rw-r--r-- 1 wxh wxh 6489 Mar 2 2020 org/apache/kafka/common/record/KafkaLZ4BlockInputStream.class
我还可以确认包含该类org.apache.kafka:kafka-clients:jar:2.4.1
确实存在于依赖树中。
+- org.apache.flink:flink-core:jar:1.14.0:compile
| +- org.apache.flink:flink-annotations:jar:1.14.0:compile
| +- org.apache.flink:flink-metrics-core:jar:1.14.0:compile
| +- org.apache.flink:flink-shaded-asm-7:jar:7.1-14.0:compile
| +- org.apache.commons:commons-lang3:jar:3.3.2:compile
| +- com.esotericsoftware.kryo:kryo:jar:2.24.0:compile
| | +- com.esotericsoftware.minlog:minlog:jar:1.2:compile
| | - org.objenesis:objenesis:jar:2.1:compile
| +- commons-collections:commons-collections:jar:3.2.2:compile
| +- org.apache.commons:commons-compress:jar:1.21:compile
| +- org.apache.flink:flink-shaded-guava:jar:30.1.1-jre-14.0:compile
| +- org.slf4j:slf4j-api:jar:1.7.15:compile
| +- com.google.code.findbugs:jsr305:jar:1.3.9:compile
| - org.apache.flink:flink-shaded-force-shading:jar:14.0:compile
+- org.apache.flink:flink-clients_2.11:jar:1.14.0:compile
| +- org.apache.flink:flink-runtime:jar:1.14.0:compile
| | +- org.apache.flink:flink-rpc-core:jar:1.14.0:compile
| | +- org.apache.flink:flink-rpc-akka-loader:jar:1.14.0:compile
| | +- org.apache.flink:flink-queryable-state-client-java:jar:1.14.0:compile
| | +- org.apache.flink:flink-hadoop-fs:jar:1.14.0:compile
| | +- commons-io:commons-io:jar:2.8.0:compile
| | +- org.apache.flink:flink-shaded-netty:jar:4.1.65.Final-14.0:compile
| | +- org.apache.flink:flink-shaded-jackson:jar:2.12.4-14.0:compile
| | +- org.apache.flink:flink-shaded-zookeeper-3:jar:3.4.14-14.0:compile
| | +- org.javassist:javassist:jar:3.24.0-GA:compile
| | +- org.xerial.snappy:snappy-java:jar:1.1.8.3:compile
| | - org.lz4:lz4-java:jar:1.8.0:compile
| +- org.apache.flink:flink-optimizer:jar:1.14.0:compile
| +- org.apache.flink:flink-java:jar:1.14.0:compile
| | - org.apache.commons:commons-math3:jar:3.5:compile
| +- commons-cli:commons-cli:jar:1.3.1:compile
| - org.apache.flink:flink-streaming-java_2.11:jar:1.14.0:compile
| - org.apache.flink:flink-scala_2.11:jar:1.14.0:compile
| +- org.scala-lang:scala-reflect:jar:2.11.12:compile
| +- org.scala-lang:scala-library:jar:2.11.12:compile
| +- org.scala-lang:scala-compiler:jar:2.11.12:compile
| | +- org.scala-lang.modules:scala-xml_2.11:jar:1.0.5:compile
| | - org.scala-lang.modules:scala-parser-combinators_2.11:jar:1.0.4:compile
| - com.twitter:chill_2.11:jar:0.7.6:compile
| - com.twitter:chill-java:jar:0.7.6:compile
+- org.apache.flink:flink-connector-files:jar:1.14.0:compile
| +- org.apache.flink:flink-file-sink-common:jar:1.14.0:compile
| - org.apache.flink:flink-connector-base:jar:1.14.0:compile
+- org.apache.flink:flink-connector-kafka_2.11:jar:1.14.0:compile
| - org.apache.kafka:kafka-clients:jar:2.4.1:compile # <--- IT DOES EXISTS
| - com.github.luben:zstd-jni:jar:1.4.3-1:compile
+- org.projectlombok:lombok:jar:1.18.22:provided
有人对这个奇怪的问题有任何经验吗?
谢谢!
事实证明,LZ4Factory 通过反射加载net.jpountz.lz4.LZ4*Compressor/Decompressor
类,而maven-shade-plugin
具有删除所有未直接引用的类(例如通过反射)的<minimizeJar>true</minimizeJar>
。
因此,当 KafkaLZ4BlockInputStream 尝试在构造过程中通过LZ4Factory
访问这些类之一时,它会失败。
设置<minimizeJar>false</minimizeJar>
解决了它。