如何在Kafka流中有效地链接来自扁平api数据的groupby查询?



我有一些来自API到Kafka主题的随机数据,看起来像这样:

{"vin": "1N6AA0CA7CN040747", "make": "Nissan", "model": "Pathfinder", "year": 1993, "color": "Blue", "salePrice": "$58312.28", "city": "New York City", "state": "New York", "zipCode": "10014"}
{"vin": "1FTEX1C88AF678435", "make": "Audi", "model": "200", "year": 1991, "color": "Aquamarine", "salePrice": "$65651.53", "city": "Newport Beach", "state": "California", "zipCode": "92662"}
{"vin": "JN8AS1MU1BM237985", "make": "Subaru", "model": "Legacy", "year": 1990, "color": "Violet", "salePrice": "$21325.27", "city": "Joliet", "state": "Illinois", "zipCode": "60435"}
{"vin": "SCBGR3ZA1CC504502", "make": "Mercedes-Benz", "model": "E-Class", "year": 1986, "color": "Fuscia", "salePrice": "$81822.04", "city": "Pasadena", "state": "California", "zipCode": "91117"}

我能够创建KStream对象并观察它们,像这样:

KStream<byte[], UsedCars> usedCarsInputStream = 
builder.stream("used-car-colors", Consumed.with(Serdes.ByteArray(), new UsedCarsSerdes()));
//k, v => year, countof cars in year
KTable<String,Long> yearCount = usedCarsInputStream
.filter((k,v)->v.getYear() > 2010)
.selectKey((k,v) -> v.getVin())
.groupBy((key, value) -> Integer.toString(value.getYear()))
.count().toStream().print(Printed.<String, Long>toSysOut().withLabel("blah")); 

这当然给了我们一个记录的计数,按每年分组,大于2010年。然而,我想在下一步做的,但无法完成的是,简单地取每一年,就像在foreach中一样,并计算每年每种颜色的汽车数量。我尝试在yearCount.toStream()上编写foreach以进一步处理数据,但没有得到结果。

我正在寻找可能看起来像这样的输出:

{
"2011": [
{
"blue": "99",
"green": "243,",
"red": "33"
}
],
"2012": [
{
"blue": "74,",
"green": "432,",
"red": "2"
}
]
}

我相信我可能已经回答了我自己的问题。我欢迎其他人来评论我自己的解决方案。

我没有意识到的是,您可以对一个本质上是复合对象的对象执行GroupBy操作。在本例中,我需要以下SQL语句

的等价物
SELECT   year, color, count(*) FROM use_car_colors AS years 
GROUP BY year, color

在Kafka Streams中,你可以通过创建一个对象来实现这一点——在这种情况下,我创建了一个名为'YearColor'的POJO类,其中包含成员year和color——然后选择它作为Kafka Streams中的一个键:

usedCarsInputStream
.selectKey((k,v) -> new YearColor(v.getYear(), v.getColor()))
.groupByKey(Grouped.with(new YearColorSerdes(), new UsedCarsSerdes()))
.count()
.toStream()
.peek((yc, ct) -> System.out.println("year: " + yc.getYear() + " color: " + yc.getColor() 
+ " count: " + ct));

当然,您必须为此对象实现序列化器和反序列化器(我使用了YearColorSerdes())。当运行Kafka Streams应用程序时,我的输出为我提供了修改计数的更新,如:

year: 2012 color: Maroon count: 2
year: 2013 color: Khaki count: 1
year: 2012 color: Crimson count: 5
year: 2011 color: Pink count: 4
year: 2011 color: Green count: 2

这就是我要找的

最新更新