azure databricks自动加载器与结构流



我们正面临一个读取和写入流数据到目标位置的问题。

我们正在使用一些JSON遥测数据来跟踪步骤。每5秒就有新的数据文件进入我们的三角洲湖。需要一种自动摄取到三角洲湖泊的方法。

希望对您有所帮助


query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", <schemaLocation>)
.load(<dataset_source>)
.writeStream
.format("delta")
.option("checkpointLocation", <checkpoint_path>)
.trigger(processingTime="<Provide the time>")
.outputMode("append") # you can use complete if needed
.table("table_name"))

更多信息请参考:https://docs.databricks.com/ingestion/auto-loader/index.html

如果您想要读取特定子文件夹。对:这是我的文件位置/mnt/2023/01/13.我想读取2023/01内部数据,然后加载这样的load('/mnt/<folder>/<sub_folder>')/mnt/2023/*数据

query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", <Location>)
.load('/mnt/<folder>/<sub_folder>')

最新更新