NiFi - 将 hdfs 中的文件移动到文件目录属性



我一直在尝试使用MoveHDFS处理器将镶木地板文件从hdfs中的/working/partition/目录移动到/success/partition/目录。分区值是根据流中前面的 ExecuteSparkJob 处理器设置的。在根/目录中找到我的镶木地板文件后,我在输出目录的处理器描述中找到了以下内容:

将文件移动到支持的 HDFS 目录 表达式语言:true(将使用变量注册表进行评估 仅(

原来处理器正在将文件发送到/而不是${dir}/

由于我的属性是根据 spark 处理结果动态设置的,因此我不能简单地添加到变量注册表并重新启动每个流文件的节点(根据我有限的理解,这是使用变量注册表所需要的(。 一种选择是使用带有自定义脚本的 ExecuteStreamCommand 处理器来完成此用例。这是我唯一的选择,还是有内置的方法将 HDFS 文件移动到属性集目录?

你可以试试这种方法:

步骤 1:使用 MoveHDFS将文件移动到临时位置,例如路径 X.MoveHDFS 处理器中的输入目录属性可以接受 flowfile 属性。

第 2 步:将成功连接到 FetchHDFS 处理器。

第 3 步:现在在 Fetch HDFS 处理器中,您可以将HDFS 文件名属性的表达式语言编写为 ${absolute.hdfs.path}/${文件名}。这会将文件数据从路径 X 提取到流文件内容中。

步骤4:将成功连接从FetchHDFS连接到PutHDFS处理器。

步骤 5:根据您的要求配置 PutHDFS 目录属性,以动态接受分区数据的流文件属性。

缺点: 这种方法的一个缺点是,将从moveHDFS创建的重复副本,以便在将数据发送到实际位置之前临时存储数据。如果不需要,您可能需要开发一个单独的流来删除重复的副本。

最新更新