我们有以下的归并函数。merge
函数确保我们根据某些条件适当地更新记录。所以,在函数使用中,你可以看到我们定义了合并条件,并将其传递给函数。这个函数目前在批处理中使用,我们每天运行一次来处理文件。
# Define merge function
def MergeToDelta(df, path, merge_target_alias, merge_source_alias, merge_conditions):
delta_table = DeltaTable.forPath(spark, path)
delta_table.alias(merge_target_alias)
.merge(
df.alias(merge_source_alias),
merge_conditions)
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
# Define merge conditions
merge_target_alias = 'target'
merge_source_alias = 'source'
merge_conditions = ('source.Id = target.Id AND ' +
'source.Name = target.Name AND ' +
'source.School = target.School AND ' +
'source.Age = target.Age AND ' +
'source.DepartmentId = target.DepartmentId AND ' +
'source.BirthDate = target.BirthDate AND ' +
'source.CallId = target.CallId')
some_schema = ''
some_path = ''
raw_df = (spark.read.schema(some_schema).json(some_path))
delta_data_path = '/mnt/students'
# Usage
MergeToDelta(raw_df, delta_data_path, merge_target_alias, merge_source_alias, merge_conditions)
使用AutoLoader/Streaming,我们使用writeStream
函数,我没有看到像我们在批处理中那样在合并条件中传递的方法。示例如下:
raw_df = (spark.readStream
.format("cloudFiles")
.schema(file_schema)
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", autoloader_checkpoint_path)
.load(path))
raw_df = (raw_df
.withColumn('Id', lit(id))
.withColumn('PartitionDate', to_date(col('BirthDate'))))
raw_df.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", writestream_checkpoint_path)
#.partitionBy(*partition_columns)
.start(delta_data_path)
也许这只是我的误解如何流工作在Databricks/Delta Live表,但有没有一种方法来指定合并条件时,写流到Delta湖?
您需要使用forEachBatch写入器,示例Python代码如下:
def foreach_batch_function(df, epoch_id):
pass # MERGE HERE
raw_df.writeStream.foreachBatch(foreach_batch_function).start()