在Databricks中为参数化的笔记本创建一个模式字典



我在Databricks中有一个笔记本,如下图所示。

from pyspark.sql import functions as F
# Define the input path. Files looks like COMPANYX_20220113.csv
input_path = '/mnt/stackoverflow/source/COMPANYX*.csv'
# Define the output path
output_path = '/mnt/stackoverflow/raw/COMPANYX'
# Read in the CSV file
raw_df = spark.read.csv(path=input_path, schema=schema, sep=';', header=False, inferSchema=False)
# Write the DataFrame in the delta format - one time operation, commented out after first run
filtered_df.write.format("delta").mode("append").save(output_path)
# Create a delta table - one time operation, commented out after first run
spark.sql(f"CREATE TABLE IF NOT EXISTS stackoverflow.RAW_COMPANYX USING DELTA LOCATION '{output_path}'")
# Create temporary view to use as source table in the merge
filtered_df.createOrReplaceTempView("new_rows")
# Save the temporary table in the delta table using merge logic
spark.sql(f"MERGE INTO stackoverflow.RAW_COMPANYX d 
USING new_rows s 
ON d.DATUM = s.DATUM 
AND d.FORNR = s.FORNR 
AND d.AVD = s.AVD 
AND d.KBESTNR = s.KBESTNR 
AND d.VAGNNR = s.VAGNNR 
WHEN MATCHED THEN UPDATE SET * 
WHEN NOT MATCHED THEN INSERT * 
")

我的问题如下:这个笔记本应该参数化可以在source中着陆的不同csv文件。COMPANYX, COMPANYY和COMPANYZ都将他们的csv文件放在这里,但是他们都有不同的模式。

对于schema=schema,我想知道如何旋转从动态路径读取csv文件时使用的模式。

我正在考虑创建一个模式字典,并根据在我的情况下调用笔记本时从ADF发送的参数获取正确的键:值对。

你会怎么做呢?以上还有其他的反馈吗?

注意:我已经排除了上面脚本中使用pyspark.sql.functions的一些转换。

  1. 在Databricks中创建小部件并从ADF中读取数据
  2. 创建一个python函数,从你计划声明的模式字典中分配模式。
def check_file_name(input_path):
if input_path.split(".")[0].endswidth("X"):
schema = assign_x_schema
elif input_path.split(".")[0].endswidth("Y"):
schema = assign_y_schema
elif input_path.split(".")[0].endswidth("Z"):
schema = assign_Z_schema
return schema
schema = check_file_name(input_path)

可能有更好的方法,我正在考虑这个方法。

希望这对你有帮助!!

最新更新