我有一些来自API到Kafka主题的随机数据,看起来像这样:
{"vin": "1N6AA0CA7CN040747", "make": "Nissan", "model": "Pathfinder", "year": 1993, "color": "Blue", "salePrice": "$58312.28", "city": "New York City", "state": "New York", "zipCode": "10014"}
{"vin": "1FTEX1C88AF678435", "make": "Audi", "model": "200", "year": 1991, "color": "Aquamarine", "salePrice": "$65651.53", "city": "Newport Beach", "state": "California", "zipCode": "92662"}
{"vin": "JN8AS1MU1BM237985", "make": "Subaru", "model": "Legacy", "year": 1990, "color": "Violet", "salePrice": "$21325.27", "city": "Joliet", "state": "Illinois", "zipCode": "60435"}
{"vin": "SCBGR3ZA1CC504502", "make": "Mercedes-Benz", "model": "E-Class", "year": 1986, "color": "Fuscia", "salePrice": "$81822.04", "city": "Pasadena", "state": "California", "zipCode": "91117"}
我能够创建KStream
对象并观察它们,像这样:
KStream<byte[], UsedCars> usedCarsInputStream =
builder.stream("used-car-colors", Consumed.with(Serdes.ByteArray(), new UsedCarsSerdes()));
//k, v => year, countof cars in year
KTable<String,Long> yearCount = usedCarsInputStream
.filter((k,v)->v.getYear() > 2010)
.selectKey((k,v) -> v.getVin())
.groupBy((key, value) -> Integer.toString(value.getYear()))
.count().toStream().print(Printed.<String, Long>toSysOut().withLabel("blah"));
这当然给了我们一个记录的计数,按每年分组,大于2010年。然而,我想在下一步做的,但无法完成的是,简单地取每一年,就像在foreach
中一样,并计算每年每种颜色的汽车数量。我尝试在yearCount.toStream()
上编写foreach
以进一步处理数据,但没有得到结果。
我正在寻找可能看起来像这样的输出:
{
"2011": [
{
"blue": "99",
"green": "243,",
"red": "33"
}
],
"2012": [
{
"blue": "74,",
"green": "432,",
"red": "2"
}
]
}
我相信我可能已经回答了我自己的问题。我欢迎其他人来评论我自己的解决方案。
我没有意识到的是,您可以对一个本质上是复合对象的对象执行GroupBy操作。在本例中,我需要以下SQL语句
的等价物SELECT year, color, count(*) FROM use_car_colors AS years
GROUP BY year, color
在Kafka Streams中,你可以通过创建一个对象来实现这一点——在这种情况下,我创建了一个名为'YearColor'的POJO类,其中包含成员year和color——然后选择它作为Kafka Streams中的一个键:
usedCarsInputStream
.selectKey((k,v) -> new YearColor(v.getYear(), v.getColor()))
.groupByKey(Grouped.with(new YearColorSerdes(), new UsedCarsSerdes()))
.count()
.toStream()
.peek((yc, ct) -> System.out.println("year: " + yc.getYear() + " color: " + yc.getColor()
+ " count: " + ct));
当然,您必须为此对象实现序列化器和反序列化器(我使用了YearColorSerdes())。当运行Kafka Streams应用程序时,我的输出为我提供了修改计数的更新,如:
year: 2012 color: Maroon count: 2
year: 2013 color: Khaki count: 1
year: 2012 color: Crimson count: 5
year: 2011 color: Pink count: 4
year: 2011 color: Green count: 2
这就是我要找的