Spark Structured Streaming 给我的错误为 org.apache.spark.sql.AnalysisException:"foreachBatch"不支持分区;



我在Databricks中设计了以下结构化流式处理代码以写入Azure Data Lake:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

microBatchOutputDF.createOrReplaceTempView("updates")

microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO silver as r
USING 
(
SELECT smtUidNr, dcl, inv, evt, smt, msgTs,msgInfSrcCd
FROM (
SELECT smtUidNr, msgTs
, RANK() OVER (PARTITION BY smtUidNr ORDER BY msgTs DESC) as rank
, ROW_NUMBER() OVER (PARTITION BY smtUidNr ORDER BY msgTs DESC) as row_num
FROM updates
)
WHERE rank = 1 AND row_num = 1
)
as u
ON u.smtUidNr = r.smtUidNr 
WHEN MATCHED and u.msgTs > r.msgTs THEN
UPDATE SET *
WHEN NOT MATCHED THEN
INSERT *
""")
}
splitDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("append").partitionBy("year","month","day").option("checkpointLocation", "abfss://checkpoint@mcfdatalake.dfs.core.windows.net/kjd/test/").start("abfss://dump@mcfdatalake.dfs.core.windows.net/main_data/")

当我尝试执行此操作时,它给我的错误如下:

org.apache.spark.sql.AnalysisException: 'foreachBatch' does not support partitioning;

将 foreachBatch 与分区一起使用的替代方法是什么?

使用 foreachBatch 进行分区的替代方法是什么?

foreachBatch中使用分区。

还可以将批处理写入增量表,并对增量表运行单独的查询,以将其与其他表合并。

您可以在首次创建增量表时建立分区,然后在执行后续更新插入时将保留分区。 下面是一个示例:

source_path = 'abfss://stage1np@youraccount.dfs.core.windows.net/example/student'
destination_path = 'abfss://stage2np@youraccount.dfs.core.windows.net/example/student'
# land some basic data in csv format
df1 = spark.createDataFrame([(1,'John','English','2021'), (2,'Jane','English','2021')], ['id', 'name', 'language', 'school_year'])
df1.write.format('csv').save(source_path, header='true')
# now process the initial data into a delta table, partitioned by 'school_year'
schema = StructType([StructField('id',StringType()),StructField('name',StringType()),StructField('language',StringType()),StructField('school_year',StringType())])
df = spark.readStream.load(source_path, format='csv', header='true', schema=schema)
query = df.writeStream.format('delta').outputMode('append').trigger(once=True).option('checkpointLocation', source_path + '/_checkpoints').partitionBy('school_year').start(destination_path)
query.awaitTermination()
spark.read.load(destination_path, format='delta').show()
# land additional inbound data with new and updated rows
df2 = spark.createDataFrame([(1,'Johnny','English','2021'), (3,'Jimmy','English','2022')], ['id', 'name', 'language', 'school_year'])
df2.write.format('csv').save(source_path, header='true', mode='append')
# now merge the new data in     
dt = DeltaTable.forPath(spark, destination_path)
def upsert(batch_df, batchId):
dt.alias("current").merge(batch_df.alias("updates"), "current.id = updates.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()             
query = df.writeStream.format("delta").foreachBatch(upsert).outputMode("update").trigger(once=True).option("checkpointLocation", source_path + '/_checkpoints').start(destination_path)
query.awaitTermination()
spark.read.load(destination_path, format='delta').show()

如果要将其放入函数中并让它检查是否存在现有的增量表,则可以使用:

DeltaTable.isDeltaTable(spark, destination_path)

最新更新