无法将Eventhub数据写入Databricks中的ADLS,导致运行时错误



当我在databricks中运行writeStream代码时,如下所示:

df_out = cdf.writeStream
.format("json")
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint/directory")
.start("/mnt/container-name/folder-name")

我得到运行时错误在databricks:

现在运行writeStream代码后,我得到以下错误:如何解决这个错误

org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:289)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:198)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:606)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:604)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:604)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$4(MicroBatchExecution.scala:243)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:647)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:240)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:293)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:291)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:209)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:203)
at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:366)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:341)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:268)
Caused by: java.io.FileNotFoundException: Unable to find batch /mnt/outputcontainer/Sample/_spark_metadata/709.compact
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.applyFnToBatchByStream(HDFSMetadataLog.scala:279)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.applyFnInBatch(CompactibleFileStreamLog.scala:215)
at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.foreachInBatch(CompactibleFileStreamLog.scala:201)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
... 28 more

我通过更改下面的代码解决了上面的问题:

result =cdf.writeStream.format("json")
.option("path", "/mnt/outputcontainer/writedata2-json")
.option("checkpointLocation", "/mnt/outputcontainer/writedata2-json")
.start()

它适用于所有格式