>我有一个包含不同大小记录的单一源CSV文件,该文件将每条记录推送到一个源主题中。我想将记录从该源主题拆分为不同的 KStreams/KTables。我有一个用于一个表加载的管道,其中我将记录从源主题以分隔格式推送到 stream1,然后将记录推送到 AVRO 格式的另一个流中,然后将其推送到 JDBC 接收器连接器中,该连接器将记录推送到 MySQL 数据库中。管道需要相同。但是我想将不同表的记录推送到一个源主题中,然后根据一个值将记录拆分为不同的流。这可能吗?我试图寻找方法来做到这一点,但不能。我也可以以某种方式改进管道或使用 KTable 而不是 KStreams 或任何其他修改吗?
我的电流 - 一个源 CSV 文件 (source.csv
) -> 源主题 (名称 - 包含 Test1 记录的源主题) -> 流 1(分隔值格式) -> 流 2(作为 AVRO 值格式) -> 结束主题(名称 -sink-db-test1
) -> JDBC 接收器连接器 -> MySQL 数据库(名称 -test1
)
我有一个不同的MySQL表test2
具有不同的架构,并且该表的记录也存在于source.csv
文件中。由于模式不同,我无法按照当前的test1
管道将数据插入test2
表中。
例- 在 CSV 源文件中,
line 1 - 9,atm,mun,ronaldo
line 2- 10,atm,mun,bravo,num2
line 3 - 11,atm,sign,bravo,sick
在此示例中,要拆分的值为column 4
(ronaldo
或bravo
) 所有这些数据应分别加载到table 1
、table 2
table 3
键是第 4 列。
if col4==ronaldo, go to table 1
if col4==bravo and col3==mun, go to table 2
if col4==bravo and col3 ==sign go to table 3
我对Kafka很陌生,从上周开始Kafka开发。
您可以使用KStream#branch()
运算符编写单独的 Kafka Streams 应用程序,将记录从输入主题拆分到不同的 KStream 或输出主题:
KStream<K, V>[] branches = streamsBuilder.branch(
(key, value) -> {filter logic for topic 1 here},
(key, value) -> {filter logic for topic 2 here},
(key, value) -> true//get all messages for this branch
);
// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3
或者你可以像这样手动分支你的 KStream:
KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));
inputKStream
.filter((key, value) -> {filter logic for topic 1 here})
.to("your_1st_output_topic");
inputKStream
.filter((key, value) -> {filter logic for topic 2 here})
.to("your_2nd_output_topic");
...
我能够拆分数据并使用 KSQL 进行我在下面分享的方法。 1. 创建输入流,value_format='JSON'
和列payload
STRING
2. 有效负载将包含整个记录作为STRING
3. 然后使用WHERE
子句中的运算符将记录拆分为不同的流LIKE
同时根据需要将有效负载放入不同的流中。在这里,我使用了 KSQLSPLIT
运算符从逗号分隔格式的有效负载中获取记录