我的想法是:
首先groupByKey,然后ip,device是唯一的,然后map[ip,device]ip是密钥device是值。GroupByKey再说一遍,我认为计数值是ip对应的设备数量。
卡夫卡记录是
密钥值(ip,deviceId(
1 127.0.0.1,aa bb cc
2 127.0.0.1,aa bb cc
3 127.0.0.1,aa bb cc
(更多,但所有值均为127.0.0.1,aa bb cc(
我想在跳跃时间窗口中获得ip拥有的设备ID的数量。
代码:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> records = builder.stream(topic);
KStream<String, String> formatRecoed = records.map(new KeyValueMapper<String, String, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(value, key);
}
}
formatRecoed.groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
String[] keys = key.split(",");
return new KeyValue<>(keys[0], keys[1]);
}
}).groupByKey().windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60)).count().toStream(new KeyValueMapper<Windowed<String>, Long, String>(){
@Override
public String apply(Windowed<String> key, Long value) {
return key.toString();
}
}).map(new KeyValueMapper<String, Long, KeyValue<String, String>>(){
@Override
public KeyValue<String, String> apply(String key, Long value) {
return new KeyValue<>(key, "" + value);
}
}).to("topic");
预期结果每个时间窗口都是
密钥 nbsp nbsp 值
127.0.0.1@1543495068000/1543495188000 1
127.0.0.1@1543495074000/1543495194000 1
127.0.0.1@1543495080000/1543495200000 1
但我的运行结果是:
127.0.0.1@1543495068000/1543495188000 3
127.0.0.1@1543495074000/1543495194000 4
127.0.0.1@1543495080000/1543495200000 1
为什么
我期待着有人帮助我。
代码中有两个窗口,这可能是问题的原因。我建议这个流程:
records.map((key, value) -> {
String[] data = value.split(",");
return KeyValue.pair(data[0], data[1]);
})
.groupByKey() // by IP
.windowedBy(TimeWindows.of(1000 * 60).advanceBy(1000 * 6).until(1000 * 60))
.reduce((device1, device2) -> device1 + "|" + device2)
.toStream() // stream of lists of devices per IP in window
.mapValues(devices -> new HashSet<>(Arrays.asList(devices.split("|"))) // set of devices
.mapValues(set -> set.size().toString())
结果KStream是(IP, count(distinct(devices)))
(两个字符串(的窗口流,所以您可以将其转发到其他主题。该方法假设设备名称(|
(中不存在一个字符,如果没有,则需要更改序列化方法。