Spark kafka连接器在将消息写入broker后抛出nullpointerexception



我正在使用spark-kafka驱动程序将数据集写入kafka broker。以下是示例代码:

dataset
.write()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProducer.getKafkaBrokerUrl())
.option("topic", kafkaProducer.getTopic())
.option(KEY_SERIALIZER, kafkaProducer.getKafkaConfig().getKeySerializer())
.option(VALUE_SERIALIZER, kafkaProducer.getKafkaConfig().getValueSerializer())
.save();

尽管这是在将数据集的内容写入broker,但对于数据集中的每个记录,我都会收到以下错误。

ERROR ProducerBatch:207: Error executing user-provided callback on message for topic-partition 'Kafka-topic-0'
java.lang.NullPointerException
at org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1422)
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201)
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:591)
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:567)
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74)
at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101)
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.lang.Thread.run(Thread.java:748)

我浏览了spark结构化的流媒体文档,没有提到任何关于回调的内容。我正在使用以下库依赖项:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>${spark.kafka.client.version}</version>
</dependency>

有人能帮我解决这个问题吗?

我没有发现代码有任何错误。我唯一怀疑的是所使用的依赖项(不确定那个特定版本中是否存在错误(。以下是我使用的没有遇到任何问题的确切版本。请用这些版本测试同样的东西,看看这是否有帮助。

如果这可能没有帮助,你可以用你完整的pom.xml更新这个问题,这样我们就可以进一步研究这个

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.4.2</version>
</dependency>

最新更新