我正在尝试创建一个数据工厂管道,其中一个活动在 databricks 活动中逐个注入文件名(来自容器或其他某个文件夹),以便按传入顺序进行处理。我如何实现它?
根据源类型和摄取文件的频率,也可能是使用 Spark 结构化流式处理的一种选择。对于流数据源,还支持将文件作为源 - 读取作为数据流写入目录中的文件。支持的文件格式包括文本、csv、json、orc、parquet(有关更新的列表以及每种文件格式支持的选项,请参阅 DataStreamReader 界面的文档)。请注意,文件必须以原子方式放置在给定的目录中,在大多数文件系统中,这可以通过文件移动操作来实现。
streamingInputDF = (
spark
.readStream # Similar to Batch just using `readStream` instead of `read`
.schema(jsonSchema)
.json(inputPath)
)
如果不想永久运行笔记本,请使用触发器一次选项。使用触发器 once 选项输出为可用数据写入一次,如果没有此选项输出流将永久运行:
streamingOutputDF
.coalesce(1)
.writeStream
.format("parquet")
.partitionBy('ingest_date')
.option("checkpointLocation", checkPointPath)
.option("path", targetPath)
.trigger(once=True)
.start()
在这种情况下,可以使用数据工厂触发不带参数的数据砖笔记本。
你好苏尔比·塔亚尔,感谢您的询问。 按照传入的顺序,我假设你的意思是发送到数据砖的第一个应该是第一个完成的,而不是并行处理。
为此,您将需要以下各项:
- 数组类型的管道变量。
- 一种使用文件名填充数组变量的机制。 它可能是 GetMetadata 活动或管道参数或其他内容。 如果您需要这方面的帮助,请添加更多详细信息。
- a 数据砖资源和链接服务
在管道中创建 ForEach 活动。 在设置中标记"顺序"选项。 如果不这样做,将导致您的活动并行发送,而不是逐个发送。 在设置中,引用"项目"中的数组变量。 表达式看起来像@variables('myVariableName')
. 在 ForEach 活动的活动中,放置 Databricks 类型的活动。 选项是"笔记本","Jar"和"Python"。 对于我的,我使用了笔记本。 笔记本更容易设置,因为 UI 的"浏览"选项。 将活动设置为首先使用相应的链接服务。 设置"Python 文件"/"笔记本路径"/"主类名"。 展开"参数"部分并添加新参数。 为参数指定与数据砖脚本中相同的名称。 该值应@string(item())
(如果枚举对象不是简单的基元数组,则可能会有所不同)。 这会从 ForEach 活动中获取项,并确保它是一个字符串。 如有必要,设置库。
尝试运行/调试时,请注意 Databrick 可能需要很长时间才能启动群集。 这会增加管道运行时间。