如何使用NIFI/HDF从MS SQL读取Delta记录



我在MS SQL中有几个表,这些表每秒都会更新,并且或多或少地查询看起来像

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID 
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}

假设选择内的联接查询结果5记录如下所示。

如果查询首次运行${lastUpdateTime},并且${lastG_ID}设置为0,并且它将返回5个记录以下。处理记录后,查询将存储max(G_ID)即5和max(UpdateTime),即1512010479在etl_stat表中。

 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID    

如果表添加另外5个新记录,如下所示:

 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID 
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID 

查询将首先从etl_stat table读取max(G_ID)max(UpdateTime),并将查询如下。 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5,因此查询仅返回5个Delta记录,如下所示。

G_ID        UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID 

因此,每次查询运行时,都应首先从etl_stat表读取max(G_ID)max(UpdateTime),然后将"选择内联接查询"框架如上图所示,并进行Delta更改。

使用Spark SQL

架构

我已经实现了上述用例,如下所示:

1)spark jdbc读取凤凰表以从 etl_stat表中获取 max(G_ID)max(UpdateTime)

2)spark jdbc框架选择内在查询,例如 SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5

3)SPARK JDBC运行步骤2内JOIN查询,从MS SQL Server过程中读取Delta消息,记录并插入HBase。

4)成功插入HBase后,Spark更新了使用最新G_IDetl_stat表,即10和UpdateTime即1512010500。

5)这项工作是安排每1分钟运行一次的Cron。

使用Nifi

是架构

我想将此用例移至nifi,我想使用NIFI从MS SQL DB中读取记录,然后将此记录发送到Kafka。

成功发布到Kafka后,NIFI将在数据库中保存G_ID和UpdateTime。

一旦消息将到达Kafka,Spark Streaming将读取Kafka的消息,并使用现有业务逻辑保存到HBase。

在每个运行中,NIFI处理器都应使用max(G_ID)max(UpdateTime)选择内部加入查询,以获取Delta Records并发布到Kafka。

我是NIFI/HDF的新手。我需要您的帮助和指导才能使用NIFI/HDF实施此功能。如果您对此用例有更好的解决方案/体系结构,请建议。

很抱歉这么长的帖子。

您所描述的是JDBC Kafka Connector从开箱即用的操作。设置您的配置文件,加载它,离开。完毕。Kafka Connect是Apache Kafka的一部分。不需要额外的工具和技术。

您可能还需要考虑适当的更改数据捕获(CDC)。对于专有RDBMS(Oracle,DB2,MS SQL等),您拥有Goldengate,Attunity,DBVisit等商业工具。对于开源RDBMS(例如MySQL,PostgreSQL),您应该查看开源Debezium工具。所有这些CDC工具都直接与Kafka集成。

最新更新