NoSuchElementException由于集群配置太重而引发流式查询时出错



有人能帮我理解这个错误背后的原因吗:

ERROR Query alert [id = d19f51b1-8131-40dd-ab62, runId = 276833a0-235f-4d2e-bd61] terminated with error
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker$.metrics(BasicWriteStatsTracker.scala:180)
at org.apache.spark.sql.execution.streaming.FileStreamSink.basicWriteJobStatsTracker(FileStreamSink.scala:103)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:140)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:568)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:111)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:97)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:170)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:566)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:565)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:207)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:251)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:169)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:296)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)

集群配置为:

Databricks运行时5.5 LTS

Scala 2.11

Spark 2.4.3

驱动程序:64GB内存,16核,3DBU

工作线程:64GB内存,16核,3DBU(2-4个工作线程,可自动扩展(

fairscheduler.xml 中定义了3个并行运行的流式查询

Spark配置为:

spark.sql.autoBroadcastJoinThreshold=-1
spark.sql.broadcastTimeout=1200
spark.executor.instances=4
spark.executor.cores=16
spark.executor.memory=29g
spark.sql.shuffle.partitions=32
spark.default.parallelism=32
spark.driver.maxResultSize=25g
spark.scheduler.mode=FAIR
spark.scheduler.allocation.file=/dbfs/config/fairscheduler.xml

在下面添加代码流:

implicit class PipedObject[A](value: A) {
def conditionalPipe(f: A => A)(pred: Boolean): A =
if (pred) f(value) else value
}
implicit val spark: SparkSession = SparkSession
.builder()
.appName("MyApp")
.conditionalPipe(sess => sess.master("local[6]"))(false)
.getOrCreate()
import spark.implicits._
val cookedData = getCookedStreamingData() // streaming data as input from event hub
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "cook")
cookedData.writeStream
.option("checkpointLocation", "checkpointLocation1")
.queryName("queryName1")
.format("avro")
.option("path", "dir1")
.start()
val scoredData = score(cookedData)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "score")
scoredData.writeStream
.option("checkpointLocation", "checkpointLocation2")
.queryName("queryName2")
.format("avro")
.option("path", "dir2")
.start()
val alertData = score(scoredData)
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "alert")
alertData.writeStream
.option("checkpointLocation", "checkpointLocation3")
.queryName("queryName3")
.format("avro")
.option("path", "dir3")
.start()

示例fairScheduler.xml文件:

<allocations>
<pool name="default">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>2</minShare>
</pool>
<pool name="cook">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>5</minShare>
</pool>
<pool name="score">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>5</minShare>
</pool>
<pool name="alert">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>5</minShare>
</pool>
</allocations>

java.util.NoSuchElementException:无。获取

纯粹是您的scala编程错误。由于没有代码片段,我无法指出。

如果你正在使用选项,那么在阅读元素之前,你需要检查

Option上使用get之前的isDefined

或者您可以使用Option中的getOrElse()函数来提供默认值。

如果您使用多个sparkcontext,则可能会出现。。。

看看这个。。。Spark流异常:java.util.NoSuchElementException:None.get

最新更新