Kafka、Flink和Tidb的更新版本。假设我有三个源MySql表s_a
、s_b
和s_c
,并且希望实时收集记录到目标TiDb表t_a
和t_b
。映射规则是
`s_a` --> `t_a`
`s_b` union `s_c` ---> `t_b` with some transformation (e.g., field remapping).
我采用的解决方案是kafka+Flink和Tidb接收器,其中binlog更改被订阅到kafka主题;Flink使用主题并将转换后的结果写入Tidb。对我来说,在flink代码部分的问题是:
如何轻松地将从kafka轮询的json字符串(包含操作、表的信息(恢复为不同类型的DTO操作(例如,插入/创建
t_a
或t_b
(。我发现了一个名为CCD_ 8的工具,名为Kafka&Flink连接器,但看起来它需要源表和目标表之间的相等性。如果我有多个目标表,如何编写转换
VKDataMapper
。我很难定义T
,因为它可以是t_a
DTO(数据传输对象(或t_b
DTO。
我现有的示例代码如下:
//主程序。
StreamExecutionEnvironment environment =
StreamExecutionEnvironment.getExecutionEnvironment();
//consume is FlinkkafkaConsumer. TopicFilter returns true.
environment.addSource(consumer).filter(new TopicFilter()).map(new VKDataMapper())
.addSink(new TidbSink());
try {
environment.execute();
} catch (Exception e) {
log.error("exception {}", e);
}
public class VKDataMapper implements MapFunction<String, T> {
@Override
public T map(String value) throws Exception {
//How T can represents both `T_a data DTO` `T_b`....,
return null;
}
}
为什么不试试Flink SQL?这样,您只需要在Flink中创建一些表,然后通过类似的sql来定义您的任务
insert into t_a select * from s_a;
insert into t_b select * from s_b union select * from s_c;
请参阅中的一些示例https://github.com/LittleFall/flink-tidb-rdw,随便问任何让你困惑的问题。