我正在使用Azure数据工厂中的自托管集成运行时将数据从内部部署源(普通文件系统)复制到Azure Blob存储目标。传输后,我想通过附加在Databricks集群上运行的笔记本来自动处理文件。管道运行良好,但我的问题涉及复制活动的输出。
是否有方法获取每次运行传输的文件和文件夹的信息我会将这些信息作为参数传递给笔记本。
查看文档,似乎只有汇总信息可用:
https://learn.microsoft.com/en-us/azure/data-factory/copy-activity-overview
如果你传输了大量的文件,这是有意义的。如果不可能,我想另一种方法是将复制过程留给自己,并根据存储帐户事件创建另一个管道?或者,将每次运行的新文件和文件夹信息存储在一个固定的文本文件中,同时传输,并在笔记本中读取?
如果您想从数据工厂获取要读取的文件或目录的信息,可以使用"获取元数据活动"来完成,请参阅以下答案以获取示例。
检测笔记本中新文件的另一种方法是使用具有文件源的结构化流。这非常有效,您只需在复制活动之后调用笔记本活动。
为此,您定义了一个流式输入数据帧:
streamingInputDF = (
spark
.readStream
.schema(pqtSchema)
.parquet(inputPath)
)
inputPath指向Blob存储中的输入目录。支持的文件格式有text、csv、json、orc、parquet,因此这取决于您的具体场景是否适用。
重要的是,在目标上,您使用触发器一次选项,因此笔记本不需要临时运行,例如:
streamingOutputDF
.repartition(1)
.writeStream
.format("parquet")
.partitionBy('Id')
.option("checkpointLocation", adlpath + "spark/checkpointlocation/data/trusted/sensorreadingsdelta")
.option("path", targetPath + "delta")
.trigger(once=True)
.start()
另一种方法可能是使用Azure队列存储(AQS),请参阅以下文档。
在这种情况下,解决方案实际上非常简单。我刚刚在Azure数据工厂中创建了另一个管道,该管道由Blob created事件触发,文件夹和文件名作为参数传递到我的笔记本。似乎工作得很好,只需要最少的配置或代码。基本的过滤可以通过事件完成,其余的由笔记本决定。
对于其他遇到这种情况的人,详细信息如下:
https://learn.microsoft.com/en-us/azure/data-factory/how-to-create-event-trigger