在分布式nifi集群和电子邮件通知上合并流文件的问题



我使用的是Nifi 1.5版本,在nifi集群中有5个节点,我正在尝试使用SiteToSiteBulletinReportingTaskSiteToSiteStatusReportingTask实现集中式流量监控。概念思想是让报告任务将公告和状态发送到公共端口,并使用QueryRecord处理器检测特定事件并向团队发送电子邮件。例如,使用查询 -"select * from FLOWFILE where isBackPressureEnabled='true'"

但是我遇到的问题是,由于它的 5 节点集群,每当组件发生事件(例如连接)时,所有五个节点都会报告相同的事件。所以我有 5 个流文件分布在 5 个节点上,即将通过报告任务。因此,我的putEmail处理器为同一事件发送了 5 封电子邮件。这可能有点烦人。

到目前为止,流程看起来像 -

`InputPort` -> 
`SplitJson` ->
`EvaluateJson` -> // to differentiate between status &
bulletin messages since its a older version 
of nifi which doesnt have reporting.task.type 
attribute
RouteOnAttribute ->  
QueryRecord -> 
MergeContent -> // here I tried to merge with correlation 
attribute as componentId hoping it will 
merge all similar flowfiles together for same component
PutEmail

我尝试过合并记录,合并内容仅在主文件上执行,分区记录然后合并。甚至尝试GROUP BY处理器QueryRecord条款。但是我的策略都没有产生一个只会触发一封电子邮件的流文件。

如何实现为同一组件的同一事件仅发送一封电子邮件。有什么想法吗?

如果可以升级到 1.8 或更高版本,则可以使用策略设置为"单节点"的负载平衡连接,这种方式可以将 InputPort 之后的所有内容聚合到单个节点,然后您的 QueryRecord 和 MergeContent 将可以访问所有流文件。

我能够通过将所有节点中的所有事件流文件写入一个通用的 HDFS 路径,然后仅在Primary Node上使用ListHDFS+FetchHDFS来解决此问题。这使我只能在一个节点上抓取所有事件,删除所有重复的事件,并且只发送电子邮件一次。

最新更新