我们正面临一个读取和写入流数据到目标位置的问题。
我们正在使用一些JSON遥测数据来跟踪步骤。每5秒就有新的数据文件进入我们的三角洲湖。需要一种自动摄取到三角洲湖泊的方法。
希望对您有所帮助
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", <schemaLocation>)
.load(<dataset_source>)
.writeStream
.format("delta")
.option("checkpointLocation", <checkpoint_path>)
.trigger(processingTime="<Provide the time>")
.outputMode("append") # you can use complete if needed
.table("table_name"))
更多信息请参考:https://docs.databricks.com/ingestion/auto-loader/index.html
如果您想要读取特定子文件夹。对:这是我的文件位置/mnt/2023/01/13
.我想读取2023/01
内部数据,然后加载这样的load('/mnt/<folder>/<sub_folder>')
或/mnt/2023/*
数据
query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", <Location>)
.load('/mnt/<folder>/<sub_folder>')