如何将从一个主题创建的流连接到从另一个主题派生的 KTable 派生(作为聚合操作)



问题:如何将从TOPIC_2(在步骤2中(创建的流连接到KTable stateTable(在格式的步骤1中(。

目标:在连接操作后,如果我们更改 AlarmState(KTable stateTable 的值(对象的状态,相同的状态应该反映在 stateTable(步骤 1 的一部分(中

在 Step1 中描述了 KTable(作为状态表((从 TOPIC_1 创建(还有另一个主题TOPIC_2生成数据(在步骤2中(状态表的键和TOPIC_2中生成的数据相同

步骤1.

final KStream<String, MetricBasicMessage> basicMsgStream = builder.stream("TOPIC_1",
                Consumed.with(Serdes.String(), new JSONSerde<>()));
KTable <String, AlarmState> stateTable = 
         builder.stream("TOPIC_1",Consumed.with(Serdes.String(), new JSONSerde<>()))
                .flatMapValues(...)
                .filter(...)
                .map(...)
                .groupByKey(...)
                .aggregate(...);
final KafkaStreams streams = new KafkaStreams(builder.build(), <streamsConfiguration>);
        streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

步骤2.

String keyToJoinWithState = key.substring(0, index);
producer.send("TOPIC_2", keyToJoinWithState, new NotificationMessage(taskType, thresh),"NOTIIFCATION_MESSAGE");
如果你想

用一些表加入流,你只需要调用

KStream::join(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);

它将是这样的:

KStream<String, String> stream2 = builder.<String, NotificationMessage >stream("TOPIC_2", Consumed.with(Serdes.String(), new NotificationMessageSerdes()));
stream2.join(stateTable, (v1, v2) -> ??? /* How to join values from Stream and KTable */).to("output2");

相关内容

  • 没有找到相关文章

最新更新