我的源是azure数据工厂,它正在将文件复制到containerA --> FolderA,FolderB, FolderC
。我使用下面的语法来使用自动加载器需要读取的任何一个文件夹中的文件。
装载我已经做了直到存储帐户
source = "abfss://containerA@storageaccount.dfs.core.windows.net/",
mount_point = "/mnt/containerA/",
extra_configs = configs)
流代码:
df1=spark.readStream.format("cloudFiles")
.option("cloudFiles.format","Json")
.option("cloudFiles.useNotifications","True")
.option('cloudFiles.subscriptionId',"xxxx-xxxx-xxxx-xxxx-xxx")
.option('cloudFiles.tenantId',"xxxx-1cxxxx98-xxxx-xxxx-xxxx")
.option("cloudFiles.clientId","xxxx-xx-46d8-xx-xxx")
.option("cloudFiles.clientSecret","xxxxxxxxxx")
.option('cloudFiles.resourceGroup',"xxxx-xxx")
.schema(Userdefineschema)
.load("/mnt/containerA/")
.withColumn("rawFilePath",input_file_name())
上面的语法是创建新的队列,如果我想给队列命名,总是有任何方法的。
当我启动流并且adf正在将数据复制到文件夹A时出现问题。流运行良好。但是当adf开始将数据复制到文件夹B时,流式查询没有获取存在于同一流式会话中的文件夹B中的记录。但当我关闭流式传输单元并再次启动时,它将为文件夹A和文件夹B选择数据。我的目标是当文件进入任何文件夹时使用自动加载器。流式传输将自动启动。
请告诉我,我是新来的火花流。
感谢Anuj gupta
请尝试使用执行嵌套文件夹文件查找
.option("recursiveFileLookup", "true")