如何实现类似的sql:用kafka流从x组中选择a,count(distinct(b))



我的想法是:

首先groupByKey,然后ip,device是唯一的,然后map[ip,device]ip是密钥device是值。GroupByKey再说一遍,我认为计数值是ip对应的设备数量。

卡夫卡记录是

密钥值(ip,deviceId(

1 127.0.0.1,aa bb cc

2 127.0.0.1,aa bb cc

3 127.0.0.1,aa bb cc

(更多,但所有值均为127.0.0.1,aa bb cc(

我想在跳跃时间窗口中获得ip拥有的设备ID的数量。

代码:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> records = builder.stream(topic);
KStream<String, String> formatRecoed = records.map(new KeyValueMapper<String, String, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(value, key);
}
}
formatRecoed.groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
String[] keys = key.split(",");
return new KeyValue<>(keys[0], keys[1]);
}
}).groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
return new KeyValue<>(key, "" + value);
}
}).to("topic");

预期结果每个时间窗口都是

密钥 nbsp nbsp 值

127.0.0.1@1543495068000/1543495188000 1

127.0.0.1@1543495074000/1543495194000 1

127.0.0.1@1543495080000/1543495200000 1

但我的运行结果是:

127.0.0.1@1543495068000/1543495188000 3

127.0.0.1@1543495074000/1543495194000 4

127.0.0.1@1543495080000/1543495200000 1

为什么

我期待着有人帮助我。

代码中有两个窗口,这可能是问题的原因。我建议这个流程:

records.map((key, value) -> {
String[] data = value.split(",");
return KeyValue.pair(data[0], data[1]);
})
.groupByKey() // by IP
.windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60))
.reduce((device1, device2) -> device1 + "|" + device2)
.toStream() // stream of lists of devices per IP in window
.mapValues(devices -> new HashSet<>(Arrays.asList(devices.split("|"))) // set of devices
.mapValues(set -> set.size().toString())

结果KStream是(IP, count(distinct(devices)))(两个字符串(的窗口流,所以您可以将其转发到其他主题。该方法假设设备名称(|(中不存在一个字符,如果没有,则需要更改序列化方法。

相关内容

  • 没有找到相关文章

最新更新