问题:如何将从TOPIC_2(在步骤2中(创建的流连接到KTable stateTable(在格式的步骤1中(。
目标:在连接操作后,如果我们更改 AlarmState(KTable stateTable 的值(对象的状态,相同的状态应该反映在 stateTable(步骤 1 的一部分(中
在 Step1 中描述了 KTable(作为状态表((从 TOPIC_1 创建(还有另一个主题TOPIC_2生成数据(在步骤2中(状态表的键和TOPIC_2中生成的数据相同
步骤1.
final KStream<String, MetricBasicMessage> basicMsgStream = builder.stream("TOPIC_1",
Consumed.with(Serdes.String(), new JSONSerde<>()));
KTable <String, AlarmState> stateTable =
builder.stream("TOPIC_1",Consumed.with(Serdes.String(), new JSONSerde<>()))
.flatMapValues(...)
.filter(...)
.map(...)
.groupByKey(...)
.aggregate(...);
final KafkaStreams streams = new KafkaStreams(builder.build(), <streamsConfiguration>);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
步骤2.
String keyToJoinWithState = key.substring(0, index);
producer.send("TOPIC_2", keyToJoinWithState, new NotificationMessage(taskType, thresh),"NOTIIFCATION_MESSAGE");
用一些表加入流,你只需要调用
KStream::join(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner);
它将是这样的:
KStream<String, String> stream2 = builder.<String, NotificationMessage >stream("TOPIC_2", Consumed.with(Serdes.String(), new NotificationMessageSerdes()));
stream2.join(stateTable, (v1, v2) -> ??? /* How to join values from Stream and KTable */).to("output2");