我使用的是Nifi 1.5版本,在nifi集群中有5个节点,我正在尝试使用SiteToSiteBulletinReportingTask
和SiteToSiteStatusReportingTask
实现集中式流量监控。概念思想是让报告任务将公告和状态发送到公共端口,并使用QueryRecord
处理器检测特定事件并向团队发送电子邮件。例如,使用查询 -"select * from FLOWFILE where isBackPressureEnabled='true'"
但是我遇到的问题是,由于它的 5 节点集群,每当组件发生事件(例如连接)时,所有五个节点都会报告相同的事件。所以我有 5 个流文件分布在 5 个节点上,即将通过报告任务。因此,我的putEmail
处理器为同一事件发送了 5 封电子邮件。这可能有点烦人。
到目前为止,流程看起来像 -
`InputPort` ->
`SplitJson` ->
`EvaluateJson` -> // to differentiate between status &
bulletin messages since its a older version
of nifi which doesnt have reporting.task.type
attribute
RouteOnAttribute ->
QueryRecord ->
MergeContent -> // here I tried to merge with correlation
attribute as componentId hoping it will
merge all similar flowfiles together for same component
PutEmail
我尝试过合并记录,合并内容仅在主文件上执行,分区记录然后合并。甚至尝试GROUP BY
处理器QueryRecord
条款。但是我的策略都没有产生一个只会触发一封电子邮件的流文件。
如何实现为同一组件的同一事件仅发送一封电子邮件。有什么想法吗?
如果可以升级到 1.8 或更高版本,则可以使用策略设置为"单节点"的负载平衡连接,这种方式可以将 InputPort 之后的所有内容聚合到单个节点,然后您的 QueryRecord 和 MergeContent 将可以访问所有流文件。
我能够通过将所有节点中的所有事件流文件写入一个通用的 HDFS 路径,然后仅在Primary Node
上使用ListHDFS
+FetchHDFS
来解决此问题。这使我只能在一个节点上抓取所有事件,删除所有重复的事件,并且只发送电子邮件一次。