带有合并条件的Databricks自动加载器



我们有以下的归并函数。merge​函数确保我们根据某些条件适当地更新记录。所以,在函数使用中,你可以看到我们定义了合并条件,并将其传递给函数。这个函数目前在批处理中使用,我们每天运行一次来处理文件。

# Define merge function
def MergeToDelta(df, path, merge_target_alias, merge_source_alias, merge_conditions):
delta_table = DeltaTable.forPath(spark, path)
delta_table.alias(merge_target_alias) 
.merge(
df.alias(merge_source_alias),
merge_conditions) 
.whenMatchedUpdateAll() 
.whenNotMatchedInsertAll() 
.execute()
# Define merge conditions
merge_target_alias = 'target'
merge_source_alias = 'source'
merge_conditions = ('source.Id = target.Id AND ' +
'source.Name = target.Name AND ' +
'source.School = target.School AND ' +
'source.Age = target.Age AND ' +
'source.DepartmentId = target.DepartmentId AND ' +
'source.BirthDate = target.BirthDate AND ' +
'source.CallId = target.CallId')
some_schema = ''
some_path = ''
raw_df = (spark.read.schema(some_schema).json(some_path))
delta_data_path = '/mnt/students'
# Usage
MergeToDelta(raw_df, delta_data_path, merge_target_alias, merge_source_alias, merge_conditions)

使用AutoLoader/Streaming,我们使用writeStream​函数,我没有看到像我们在批处理中那样在合并条件中传递的方法。示例如下:

raw_df = (spark.readStream
.format("cloudFiles")
.schema(file_schema)
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", autoloader_checkpoint_path)
.load(path))
raw_df = (raw_df
.withColumn('Id', lit(id))
.withColumn('PartitionDate', to_date(col('BirthDate'))))
raw_df.writeStream 
.format("delta") 
.outputMode("append") 
.option("checkpointLocation", writestream_checkpoint_path) 
#.partitionBy(*partition_columns)
.start(delta_data_path)

也许这只是我的误解如何流工作在Databricks/Delta Live表,但有没有一种方法来指定合并条件时,写流到Delta湖?

您需要使用forEachBatch写入器,示例Python代码如下:

def foreach_batch_function(df, epoch_id):
pass # MERGE HERE

raw_df.writeStream.foreachBatch(foreach_batch_function).start()  

最新更新