正如标题所示,Java 流代码如下所示:
StreamsBuilder builder = new StreamsBuilder();
KStream<String, City> citesStream = builder.stream("cities"
, Consumed.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));
citesStream.filter((name, city) -> city.getParentId() != 0).to("citiesExcludeProvince"
, Produced.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));
KTable<String, City> allCityTable = builder.table("citiesExcludeProvince"
, Consumed.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));
我想过滤一些城市并保存到另一个 Kafka 主题,然后将其读取为 KTable 以便加入,如下所示:
KStream<String, City> provinceStream = citesStream
.filter((name, city) -> city.getParentId() == 0);
provinceStream.leftJoin(allCityTable, (province, city) -> {
System.out.println(JsonUtil.objectToJson(province));
System.out.println(JsonUtil.objectToJson(city));
if (province != null && city != null) {
if (city.getParentId() == province.getId()) {
if (province.getChildren() == null) {
province.setChildren(Lists.newArrayList());
}
province.getChildren().add(city);
}
}
return province;
}).to("provinceWithCity", Produced.with(Serdes.String(), SerdesFactory.serdesFrom(City.class)));
但我明白citiesExcludeProvince
话题总是空的。错误在哪里?
StreamsBuilder.table() 可以在 Kafka Stream 中使用相同的主题和同一个 StreamsBuilder 吗?
是的,您可以将输入主题用于StreamsBuilder.table()
,该输入主题是KStream.to().
StreamsBuilder
的输出主题,不允许某些类型的循环,但允许这些循环(贯穿某些主题)。在这方面,我认为您的代码没有任何问题。
我想过滤一些城市并保存到另一个卡夫卡主题,然后将其读取为 KTable 以便加入如下......但是我得到城市排除省主题总是空的。错误在哪里?
您的代码存在几个问题:
- 到达联接的城市不按省 ID 进行键控。所以加入永远不会发生。
- 如果城市按省 ID 键入,则到达右侧表的每个城市都将覆盖之前到达右侧的任何城市。这是因为表是按键值的更改日志。如果按省 ID 键入的流中有多个属于某个省的城市,则在表中,您只会看到最后一个到达的城市。
- 右侧表不会触发计算。这是一个
KStream
-KTable
连接,这种连接的语义是左侧原因处理中唯一的事件。右侧的事件仅存储在表中。(在相关的说明中,您不能真正使用KStream
-KTable
联接来处理历史数据。当您打开 Kafka Streams 应用程序时,它有一个读取所有输入主题的使用者。如果它在allCityTable
的内容之前读取创建provinceStream
的主题,那么您的省份将不会在allCityTable
中找到任何内容,因为它仍然是空的。 - 左侧永远不会
null
(您不必进行该检查)。
我认为这就是您要找的:
// Step 1
KTable<String, ArrayList<City>> citiesByProvince = citesStream
.filter((name, city) -> city.getParentId() != 0)
.groupBy((k, v) -> v.getParentId())
.aggregate(ArrayList::new,
(k, v, a) -> {
a.add(v);
return a;
});
// Step 2
provinceStream
.groupBy((k, v) -> v.getId())
.reduce((a, b) -> a)
.join(citiesByProvince, (province, cities) -> {
province.setChildren(cities);
return province;
});
- 第 1 步:按省 ID 将所有城市汇总到一个列表中。生成的列表按省 ID 键入。
- 第 2 步:将省份转换为按省 ID 键控的表(您可以通过将
provinceStream
的内容写入主题然后使用StreamBuilder.table()
来等效地执行此操作,但groupBy()
->reduce()
在这里执行相同的操作),然后执行连接。
与KStream-KTable
联接不同,KTable-KTable
联接对记录从底层使用者到达的顺序不敏感,因此您将获得确定性结果。