我们在项目中使用kedro。通常,可以将数据集定义为:
client_table:
type: spark.SparkDataSet
filepath: ${base_path_spark}/${env}/client_table
file_format: parquet
save_args:
mode: overwrite
现在我们在数据块上运行,它们提供了许多优化,例如autoOptimizeShuffle
。我们正在考虑利用这一点来处理我们的15TB以上的数据集。
然而,我不清楚如何将kedro与databricks德尔塔湖解决方案一起使用
它为我们工作。
client_table:
type: kedro.contrib.io.pyspark.SparkDataSet
filepath: ${base_path_spark}/${env}/client_table
file_format: "delta"
save_args:
mode: overwrite
Kedro现在有了一个本地数据集,请参阅此处的文档:https://kedro.readthedocs.io/en/stable/tools_integration/pyspark.html#spark-和三角洲-湖泊相互作用
temperature:
type: spark.SparkDataSet
filepath: data/01_raw/data.csv
file_format: "csv"
load_args:
header: True
inferSchema: True
save_args:
sep: '|'
header: True
weather@spark:
type: spark.SparkDataSet
filepath: s3a://my_bucket/03_primary/weather
file_format: "delta"
save_args:
mode: "overwrite"
versionAsOf: 0
weather@delta:
type: spark.DeltaTableDataSet
filepath: s3a://my_bucket/03_primary/weather