KStream left使用相同的密钥加入KStream



我有一个问题,我试图左联接 2 个流。 合并键有 100+ 具有相同键的列表,而 DataStream 只有 1 个列表与 mergedKey 具有相同的密钥。我想在 endStream 值中从 mergedKey 与 DataStream 合并。

//get DataStream 
final KStream<String, GenericRecord> DataStream = builder.stream("Datastreams");
// Transform merged to Equals Keys to DataStream.Iot
final KStream<String, GenericRecord> mergedKey = mergedFoIObs
.map((key, value) -> KeyValue.pair(value.get("Datastream").toString(), value)); 
// Join the DataStream with MergedStream

final KStream<String, String> mergedFoIObsData = mergedKey.leftJoin(
DataStream,
(value, data) -> {
try {
if(data != null{
value.put("Datastream", data.toString());
JSONObject jo = (JSONObject) new JSONParser().parse(value.toString());
return jo.toJSONString();}
return null

} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}, JoinWindows.of(10000));

但我的问题是,在 Endstream 中,我只得到 1 个具有正确值的列表,而其他列表的值为 null。

Taht 表示数据在第一"轮"后为空。

当我从 DataStream 转换为 KTable 时,我遇到了一个问题,我得到了正确的列表,但只有 37 个列表,所以 60 个是错过的。

我希望你能帮助我。

对于 KStream-KStream 联接,取决于记录的时间戳是否加入。 查看此博客文章以获取更多详细信息:https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

对于 KStream-KTable 联接,这取决于何时将 KTable 记录加载到 KTable 中 - Kafka Streams 尝试根据时间戳同步加载,但这是一种尽力而为的方法。因此,可能会首先处理某些 KStream 记录,而 KTable 仍然为空。只有在处理了 KTable 记录(即,更新并包含该记录的 KTable(之后,连续的 KStream 记录才会成功加入。

请注意,下一个 Kafka 版本 2.1 将改进此时间戳同步并提供更强的保证,用户甚至可以配置保证的严格程度。

"此外,对于此 KStream 的每个不满足连接谓词的输入记录,将使用另一个流的空值调用提供的 ValueJoiner。

因此,当数据(右值(为 null 时,不应返回 null,而应返回值(左值(。

左侧的卡夫卡文档链接加入

最新更新