我在Databricks中有一个笔记本,如下图所示。
from pyspark.sql import functions as F
# Define the input path. Files looks like COMPANYX_20220113.csv
input_path = '/mnt/stackoverflow/source/COMPANYX*.csv'
# Define the output path
output_path = '/mnt/stackoverflow/raw/COMPANYX'
# Read in the CSV file
raw_df = spark.read.csv(path=input_path, schema=schema, sep=';', header=False, inferSchema=False)
# Write the DataFrame in the delta format - one time operation, commented out after first run
filtered_df.write.format("delta").mode("append").save(output_path)
# Create a delta table - one time operation, commented out after first run
spark.sql(f"CREATE TABLE IF NOT EXISTS stackoverflow.RAW_COMPANYX USING DELTA LOCATION '{output_path}'")
# Create temporary view to use as source table in the merge
filtered_df.createOrReplaceTempView("new_rows")
# Save the temporary table in the delta table using merge logic
spark.sql(f"MERGE INTO stackoverflow.RAW_COMPANYX d
USING new_rows s
ON d.DATUM = s.DATUM
AND d.FORNR = s.FORNR
AND d.AVD = s.AVD
AND d.KBESTNR = s.KBESTNR
AND d.VAGNNR = s.VAGNNR
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
")
我的问题如下:这个笔记本应该参数化可以在source
中着陆的不同csv文件。COMPANYX, COMPANYY和COMPANYZ都将他们的csv文件放在这里,但是他们都有不同的模式。
对于schema=schema
,我想知道如何旋转从动态路径读取csv文件时使用的模式。
我正在考虑创建一个模式字典,并根据在我的情况下调用笔记本时从ADF发送的参数获取正确的键:值对。
你会怎么做呢?以上还有其他的反馈吗?
注意:我已经排除了上面脚本中使用pyspark.sql.functions
的一些转换。
- 在Databricks中创建小部件并从ADF中读取数据
- 创建一个python函数,从你计划声明的模式字典中分配模式。
def check_file_name(input_path):
if input_path.split(".")[0].endswidth("X"):
schema = assign_x_schema
elif input_path.split(".")[0].endswidth("Y"):
schema = assign_y_schema
elif input_path.split(".")[0].endswidth("Z"):
schema = assign_Z_schema
return schema
schema = check_file_name(input_path)
可能有更好的方法,我正在考虑这个方法。
希望这对你有帮助!!