由以下原因引起:org.apache.kafka.com .record. invalidrecordexception



我们在kafka代理上使用kafka-2.2.1的消息并配置compression.type = zstd时遇到问题。完整的异常堆栈是

org.apache.kafka.common.KafkaException: Received exception when fetching the next record from test-10. If needed, please seek past the record to continue consumption.
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1519)
at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1374)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1282)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258)
Caused by: org.apache.kafka.common.InvalidRecordException: Incorrect declared batch size, records still remaining in file

我们终于解决了这个问题。我们检查了kafka消费者使用的jar版本,我们发现zstd-jni-1.3.2-2.jar的版本低于kafka代理zstd-jni-1.3.8-1.jar的版本。在pom.xml中,jar版本是冲突的,因为我们依赖于pom.xml中的spark-core,而这个jar间接依赖于zstd-jni-1.3.2-2.jar。所以只要把zstd从火花核中排除就解决了。因此,请检查代理和客户机中使用的压缩和解压缩jar的版本。(例如,zstd lz4,上口,gzip)

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
<scope>${spark.scope}</scope>
<exclusions>
<exclusion>
<artifactId>zstd-jni</artifactId>
<groupId>com.github.luben</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>

相关内容

最新更新