如何将记录拆分为不同的流,从一个主题到不同的流?



>我有一个包含不同大小记录的单一源CSV文件,该文件将每条记录推送到一个源主题中。我想将记录从该源主题拆分为不同的 KStreams/KTables。我有一个用于一个表加载的管道,其中我将记录从源主题以分隔格式推送到 stream1,然后将记录推送到 AVRO 格式的另一个流中,然后将其推送到 JDBC 接收器连接器中,该连接器将记录推送到 MySQL 数据库中。管道需要相同。但是我想将不同表的记录推送到一个源主题中,然后根据一个值将记录拆分为不同的流。这可能吗?我试图寻找方法来做到这一点,但不能。我也可以以某种方式改进管道或使用 KTable 而不是 KStreams 或任何其他修改吗?

我的电流 - 一个源 CSV 文件 (source.csv) -> 源主题 (名称 - 包含 Test1 记录的源主题) -> 流 1(分隔值格式) -> 流 2(作为 AVRO 值格式) -> 结束主题(名称 -sink-db-test1) -> JDBC 接收器连接器 -> MySQL 数据库(名称 -test1)

我有一个不同的MySQL表test2具有不同的架构,并且该表的记录也存在于source.csv文件中。由于模式不同,我无法按照当前的test1管道将数据插入test2表中。

例- 在 CSV 源文件中,

line 1 - 9,atm,mun,ronaldo line 2- 10,atm,mun,bravo,num2 line 3 - 11,atm,sign,bravo,sick

在此示例中,要拆分的值为column 4(ronaldobravo) 所有这些数据应分别加载到table 1table 2table 3键是第 4 列。

if col4==ronaldo, go to table 1 if col4==bravo and col3==mun, go to table 2 if col4==bravo and col3 ==sign go to table 3

我对Kafka很陌生,从上周开始Kafka开发。

您可以使用KStream#branch()运算符编写单独的 Kafka Streams 应用程序,将记录从输入主题拆分到不同的 KStream 或输出主题:

KStream<K, V>[] branches = streamsBuilder.branch(
(key, value) -> {filter logic for topic 1 here},
(key, value) -> {filter logic for topic 2 here},
(key, value) -> true//get all messages for this branch
);
// KStream branches[0] records for logic 1
// KStream branches[1] records for logic 2
// KStream branches[2] records for logic 3

或者你可以像这样手动分支你的 KStream:

KStream<K, V> inputKStream = streamsBuilder.stream("your_input_topic", Consumed.with(keySerde, valueSerdes));
inputKStream
.filter((key, value) -> {filter logic for topic 1 here})
.to("your_1st_output_topic");
inputKStream
.filter((key, value) -> {filter logic for topic 2 here})
.to("your_2nd_output_topic");
...

我能够拆分数据并使用 KSQL 进行我在下面分享的方法。 1. 创建输入流,value_format='JSON'和列payloadSTRING2. 有效负载将包含整个记录作为STRING3. 然后使用WHERE子句中的运算符将记录拆分为不同的流LIKE同时根据需要将有效负载放入不同的流中。在这里,我使用了 KSQLSPLIT运算符从逗号分隔格式的有效负载中获取记录

相关内容

  • 没有找到相关文章

最新更新