Flink:binlog到多个DTO的转换以及Flink中的转换方法



Kafka、Flink和Tidb的更新版本。假设我有三个源MySql表s_as_bs_c,并且希望实时收集记录到目标TiDb表t_at_b。映射规则是

`s_a`  --> `t_a`                   
`s_b` union `s_c`   ---> `t_b`  with some transformation (e.g., field remapping).

我采用的解决方案是kafka+Flink和Tidb接收器,其中binlog更改被订阅到kafka主题;Flink使用主题并将转换后的结果写入Tidb。对我来说,在flink代码部分的问题是:

  1. 如何轻松地将从kafka轮询的json字符串(包含操作、表的信息(恢复为不同类型的DTO操作(例如,插入/创建t_at_b(。我发现了一个名为CCD_ 8的工具,名为Kafka&Flink连接器,但看起来它需要源表和目标表之间的相等性。

  2. 如果我有多个目标表,如何编写转换VKDataMapper。我很难定义T,因为它可以是t_aDTO(数据传输对象(或t_bDTO。

我现有的示例代码如下:

//主程序。

StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
//consume is FlinkkafkaConsumer. TopicFilter returns true. 
environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
.addSink(new TidbSink());
try {
environment.execute();
} catch (Exception e) {
log.error("exception {}", e);
}


public class VKDataMapper implements MapFunction<String, T> {
@Override
public T map(String value) throws Exception {
//How T can represents both `T_a data DTO` `T_b`...., 
return null;
}
}

为什么不试试Flink SQL?这样,您只需要在Flink中创建一些表,然后通过类似的sql来定义您的任务

insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;

请参阅中的一些示例https://github.com/LittleFall/flink-tidb-rdw,随便问任何让你困惑的问题。

相关内容

  • 没有找到相关文章

最新更新