KStream.to() 和 StreamsBuilder.table() 可以在 Kafka Stream 中使用相同



正如标题所示,Java 流代码如下所示:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, City> citesStream = builder.stream("cities"
, Consumed.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));
citesStream.filter((name, city) -> city.getParentId() != 0).to("citiesExcludeProvince"
, Produced.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));
KTable<String, City> allCityTable = builder.table("citiesExcludeProvince"
, Consumed.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));

我想过滤一些城市并保存到另一个 Kafka 主题,然后将其读取为 KTable 以便加入,如下所示:

KStream<String, City> provinceStream = citesStream
.filter((name, city) -> city.getParentId() == 0);
provinceStream.leftJoin(allCityTable, (province, city) -> {
System.out.println(JsonUtil.objectToJson(province));
System.out.println(JsonUtil.objectToJson(city));
if (province != null && city != null) {
if (city.getParentId() == province.getId()) {
if (province.getChildren() == null) {
province.setChildren(Lists.newArrayList());
}
province.getChildren().add(city);
}
}
return province;
}).to("provinceWithCity", Produced.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));

但我明白citiesExcludeProvince话题总是空的。错误在哪里?

KStream.to() 和

StreamsBuilder.table() 可以在 Kafka Stream 中使用相同的主题和同一个 StreamsBuilder 吗?

是的,您可以将输入主题用于StreamsBuilder.table(),该输入主题是KStream.to().StreamsBuilder的输出主题,不允许某些类型的循环,但允许这些循环(贯穿某些主题)。在这方面,我认为您的代码没有任何问题。


我想过滤一些城市并保存到另一个卡夫卡主题,然后将其读取为 KTable 以便加入如下......但是我得到城市排除省主题总是空的。错误在哪里?

您的代码存在几个问题:

  1. 到达联接的城市不按省 ID 进行键控。所以加入永远不会发生。
  2. 如果城市按省 ID 键入,则到达右侧表的每个城市都将覆盖之前到达右侧的任何城市。这是因为表是按键值的更改日志。如果按省 ID 键入的流中有多个属于某个省的城市,则在表中,您只会看到最后一个到达的城市。
  3. 右侧表不会触发计算。这是一个KStream-KTable连接,这种连接的语义是左侧原因处理中唯一的事件。右侧的事件仅存储在表中。(在相关的说明中,您不能真正使用KStream-KTable联接来处理历史数据。当您打开 Kafka Streams 应用程序时,它有一个读取所有输入主题的使用者。如果它在allCityTable的内容之前读取创建provinceStream的主题,那么您的省份将不会在allCityTable中找到任何内容,因为它仍然是空的。
  4. 左侧永远不会null(您不必进行该检查)。

我认为这就是您要找的:

// Step 1
KTable<String, ArrayList<City>> citiesByProvince = citesStream
.filter((name, city) -> city.getParentId() != 0)
.groupBy((k, v) -> v.getParentId())
.aggregate(ArrayList::new,
(k, v, a) -> {
a.add(v);
return a;
});
// Step 2
provinceStream
.groupBy((k, v) -> v.getId())
.reduce((a, b) -> a)
.join(citiesByProvince, (province, cities) -> {
province.setChildren(cities);
return province;
});
  • 第 1 步:按省 ID 将所有城市汇总到一个列表中。生成的列表按省 ID 键入。
  • 第 2 步:将省份转换为按省 ID 键控的表(您可以通过将provinceStream的内容写入主题然后使用StreamBuilder.table()来等效地执行此操作,但groupBy()->reduce()在这里执行相同的操作),然后执行连接。

KStream-KTable联接不同,KTable-KTable联接对记录从底层使用者到达的顺序不敏感,因此您将获得确定性结果。

相关内容

  • 没有找到相关文章

最新更新