Azure Databricks pyspark readstream从挂载的ADLS Gen2输入路径读取非orc文件



我使用Databricks pyspark读流和写流,如下所示。

我正在从挂载的ADLS Gen2 Azure存储中读取orc文件,下面是代码片段,其中文件是从一个位置读取的。

输入数据是由进程加载的

my-storage-account/my-container/table1/input/
|
- loadDate_20220621/
|          |- file1.orc
|          |- file2.orc
|_ sample.json
|_ file.json 
domains = [ 'table1','table2']
for domain in domains:
with open('{}/{}.json'.format(mount_path, schema_name), 'r') as jfile:
table_schema = T.StructType.fromJson(json.loads(jfile.read()))
)
data_schema[domain] = table_schema
for domain in domains:
#read stream 
data_readstream = ( 
spark.readStream
.format("orc")    # READING ORC FORMAT
.schema(data_schema[domain])
.load(f'{mount_path}/input/data/')
)
## other operattion to perform merge data
## create merge upsert function and store in the list
def mergeTbl1(df, batch_id): 
data_df = (
df
.dropDuplicates(['field1'])
) 
data_df.createOrReplaceTempView("table1_temp")

data_df._jdf.sparkSession().sql("""
MERGE INTO my_db.table1 tbl1
USING table1_temp tbl1u
ON tbl1.field1 = tbl1u.field1
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *                     
""")
def mergeTbl2(df, batch_id): 
data_df = (
df
.dropDuplicates(['field2'])
) 
data_df.createOrReplaceTempView("table1_temp")

data_df._jdf.sparkSession().sql("""
MERGE INTO my_db.table2 tbl2
USING table2_temp tbl2u
ON tbl2.field2 = tbl2u.field2
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *                     
""")
mergeUpsertFunctionList['table1] = mergeTbl1
mergeUpsertFunctionList['table2] = mergeTbl2

for domain in domains:
# write stream
data_writestream = ( 
data_readstreams[domain]
.repartition(1)
.writeStream
.trigger(once=True)
.format("delta")
.option("checkpointLocation", f"{mount_path}/checkpoints/{domain}")
.foreachBatch(mergeUpsertFunctionList[domain])
.start()
)

下面是我得到的例外,这是奇怪的,因为在读流中使用ORC格式,期望spark只读取ORC格式而不是json,

问题:

  • 这是新的,因为我有其他的笔记本工作很好。有时这些json文件被另一个进程锁定,但这不应该影响ORC文件。

为什么会发生这种情况?

File "/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/pyspark/sql/utils.py", line 202, in call
raise e
File "/databricks/spark/python/pyspark/sql/utils.py", line 199, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "<command-313423169960053>", line 15, in mergeDataTable1
df
File "/databricks/spark/python/pyspark/sql/readwriter.py", line 740, in save
self._jwrite.save(path)
File "/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/databricks/spark/python/pyspark/sql/utils.py", line 117, in deco
return f(*a, **kw)
File "/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o855.save.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:606)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:360)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:198)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:124)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:138)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:213)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:360)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:160)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:115)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:310)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:250)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 10) (10.44.216.76 executor 0): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/mnt_path/input............../file.json.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:521)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:494)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:614)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$hasNext$1(FileScanRDD.scala:356)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:351)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:1017)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:826)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:829)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:684)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.orc.FileFormatException: Malformed ORC file dbfs:/mount_path/input....../.../sample.json. Invalid postscript.
at org.apache.orc.impl.ReaderImpl.ensureOrcFooter(ReaderImpl.java:462)
at org.apache.orc.impl.ReaderImpl.extractFileTail(ReaderImpl.java:795)
at org.apache.orc.impl.ReaderImpl.<init>(ReaderImpl.java:564)
at org.apache.orc.OrcFile.createReader(OrcFile.java:385)
at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$2(OrcFileFormat.scala:146)
at org.apache.spark.util.Utils$.tryWithResource(Utils.scala:3035)
at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$1(OrcFileFormat.scala:146)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:457)

这里有几个方法:

  • 您可以使用路径过滤器只选择.orc文件:
data_readstream = ( 
spark.readStream
.format("orc")    # READING ORC FORMAT
.schema(data_schema[domain])
.option("pathGlobFilter", "*.orc")
.load(f'{mount_path}/input/data/')
)
  • 使用spark.sql.files.ignoreCorruptFilesSpark配置(doc)忽略被认为损坏的文件,尽管这不是最佳解决方案。

最新更新