Kubernetes 上的 Spark 结构化流的问题



我在 K8s 集群上运行结构化流式处理时收到警告和错误。以下是我的部分代码:

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
val query = df.writeStream
.outputMode("append")
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
val rstDF = batchDF.select($"value")
.map(row => valueDeserializer.deserialize(topicName, row.getAs[Array[Byte]]("value"), topicValueAvroSchema).toString)
.transform(runner.spark.read.json)
.transform(trimDF)
println(s"Batch $batchId: ${rstDF.count} rows")
rstDF.show(false)
})
.trigger(Trigger.ProcessingTime("120 seconds"))
.start()
query.awaitTermination()

第一批(批次 0(正常。但是,当数据到达批处理 1 时,我收到了有关由于 java.lang.NullPointerException 而导致任务丢失的警告。

...
19/10/12 02:02:18 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
...
19/10/12 02:02:18 INFO DAGScheduler: ResultStage 2 (start at MergeKafkaToDelta.scala:124) failed in 17.980 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...
19/10/12 02:02:18 ERROR MicroBatchExecution: Query [id = e1f15e44-ad17-452d-97cf-def26f729f38, runId = c0b7c2ba-fca4-4538-8095-cbe2daeec525] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 4, 172.30.170.218, executor 3): java.lang.NullPointerException
...

你知道它的根本原因吗?如何设置火花提交的配置和参数?我有一个与之相关的博客:Kubernetes上的Spark Streaming Checkpoint

。但是,它基于特定的云平台。你知道一般的解决方案吗?

看起来 spark-on-k8s-operator 是一个解决方案:在 Kubernetes 上激发结构化流应用程序。并且有一个新版本 火花 2.4.4: spark-on-k8s-operator

所以我正在努力,并尝试为结构化流制作一个示例。

最新更新