我每分钟使用logstash
在我的hdfs
上抛出多个csv
文件。
我需要从当前时间获取过去一分钟的文件。
我在此过程中使用nifi
。
例如,现在是上午 11:30,我只需要获取 1 分钟前或上午 11:29 保存的所有文件。
这里使用nifi
的最佳方法是什么?
谢谢。
您可以检查以程结构。
ListHDFS-->RouteOnAttribute-->FetchHDFS
您可以使用ListHDFS,它列出了hdfs文件夹中的所有文件。
使用RouteOnAttribute通过将"08-23-17-11-29-AM">转换为毫秒(toNumber(((来检查文件名中存在的日期时间是否为前一分钟。
然后检查它是否等于当前日期时间的前几分钟的毫秒数,如下所示。${now((:toNumber((:minus(60000(}.
这里我们有负 1 分钟毫秒("60000"(与当前日期时间。
如果两者都相等,则继续将该队列进入FetchHDFS处理器,它将在前一分钟的文件中获取该特定文件。
如果您遇到任何问题,请告诉我。