NiFi 自定义处理器 - 读取数据库视图



我是NiFi的新手,我正在开发一个自定义处理器来从psql数据库视图中提取最新数据。初始化自定义处理器时,我可以使用下面的代码检索数据库视图。

private void GetData(){
Connection connection = DriverManager.getConnection("jdbc:postgresql://example:5432/example", "user", "pass");
Statement statement = connection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = statement.executeQuery("SELECT * FROM Example_Table");
while(rs.next()){
//Get data from database
}
connection.close();
}

但是,我正在努力从数据库视图中获取最新的更新。主要问题是何时将新条目添加到数据库中。由于在初始化处理器时查询数据库,因此自定义处理器将没有新条目。

我尝试在公共 void onTrigger() 函数中实现查询;但是这会导致管道备份,因为它将查询每个流文件上的数据库(如果每秒有数千个流文件进入,这并不理想)。

有没有一种方法可以在处理器启动时查询数据库;而无需在每个流文件上查询数据库?或者,是否可以检测数据库是否已被修改并在修改时提取数据?甚至设置一个计时器来拉取自定义处理器中的数据库?

非常感谢任何帮助,提前感谢您。

我认为如果您能更多地解释有关更高级别的用例,它可能会帮助您获得解决方案,因为这似乎是一种不常见的方法。通常每个处理器都有单一的责任,因此一些处理器与数据库交互,然后输出必要的信息供其他处理器使用。

有一些LookupService可能是可以检查的好例子,例如MongoDBLookupService

如果您的用例实际上是"我有一个自定义处理器,它摄取包含任意数据的流文件,并且需要使用此数据库表中的最新数据对它们执行一些操作",那么您有以下几种选择:

  1. 使用类似于上述方法执行数据库查询,并在onEnabled()期间调用该方法一次以从表中获取大部分数据,然后使用线程定期调用它以保持更新并将结果本地存储在字段中。当onTrigger()方法运行时,请使用本地缓存结果,而不是进行数据库调用。这将减少延迟并为您提供近乎实时的数据。请务必通过带有@OnStopped注释的方法清理线程运行程序和本地状态。
  2. 执行与流文件处理内联的数据库查询(即onTrigger())。这可能会导致高延迟和吞吐量阻塞。如果能够使用List<FlowFile> flowfiles = session.get(1000);对流文件进行批处理,则可能会增加每个执行周期中处理的流文件数(该数量是可配置的)。
  3. 如果没有更新插入/就地修改(即对数据库表的任何更改都会导致行),则可以使用 sentinel 查询 (SELECT COUNT(*) FROM table;) 返回行数,将其与之前返回的行数进行比较,并且仅在这些数字不同时执行"昂贵"查询检索所有数据。在这种情况下,您可以通过记录以前获取的行的最大 ID 或时间戳来仅检索增量行。如果可以更新插入,类似SELECT MAX(lastModified) AS mostRecentTimeModified FROM table;的东西可能会有所帮助。

最新更新