流中5s时间窗口内的记录计数



试图在5s的时间窗口内获取流中的记录计数。总是得到1的计数??在10条记录中发送。预计最后的计数是10。

尝试遵循Fabian Hueske的建议-如何计算Apache Flink在给定时间窗口中处理的记录数量

DataStream<Map<String, Object>> kinesisStream;
...//get data from Kinesis source into kinesisStream - works fine
final SingleOutputStreamOperator<Map<String, Object>> filterDroppedEvents = kinesisStream
.filter(resultMap -> {
long timestamp = Utils.getEventTimestampFromMap(resultMap);
long currTimestamp = System.currentTimeMillis();
long driftFromCurrTS = currTimestamp - timestamp;
if (driftFromCurrTS < 0) {
Object eventNameObj = resultMap.get(EVENT_NAME);
String eventName = eventNameObj != null ? (String) eventNameObj : "";
logger.debug("PMS - event_timestamp is > current timestamp by driftFromCurrTS:{} for event_name:{} and event_timestamp:{}", driftFromCurrTS, eventName, timestamp);
return true;
} else {
return false;
}
});//called 10 times here - GOOD
final SingleOutputStreamOperator<CountRows> droppedEventsMapToCountRows = filterDroppedEvents
.map(mapValue -> new CountRows(mapValue, 1L, mapValue.get(EVENT_NAME) != null ? (String) mapValue.get(EVENT_NAME) : ""));//this is called 10 times - GOOD
final KeyedStream<CountRows, String> countRowsKeyedStream = droppedEventsMapToCountRows.keyBy(new KeySelector<CountRows, String>() {
@Override
public String getKey(CountRows countRows) throws Exception {
logger.info("Inside getKey");
return countRows.getEventName();
}
});//doesn't get in here to this logger statement ??
final AllWindowedStream<CountRows, TimeWindow> countRowsTimeWindowAllWindowedStream =  countRowsKeyedStream
.timeWindowAll(org.apache.flink.streaming.api.windowing.time.Time.seconds(5));
//.sum("count")
final SingleOutputStreamOperator<CountRows> countRowsReduceStream = countRowsTimeWindowAllWindowedStream.reduce((accum, input) -> {
logger.info("Inside reduce");
return new CountRows(input.getRow(), accum.getCount() + input.getCount(), input.getEventName());// sum 1s to count
});//don't see this logger statement "Inside reduce"
DataStream<InfluxDBPoint> droppedEventsStream =
countRowsReduceStream.flatMap(new FlatMapFunction<CountRows, InfluxDBPoint>() {
@Override
public void flatMap(CountRows countRows, Collector<InfluxDBPoint> out) throws Exception {
logger.info("Inside final map"); // only called once and countRows.getCount() is 1 - BAD - want it to be 10 ??
Map<String, Object> mapValue = countRows.getRow();
//long currTimestamp = System.currentTimeMillis();
Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
String eventTimestamp = eventTSObj != null ? (String)eventTSObj : "";
long eventTS = Utils.getLongFromDateStr(eventTimestamp);
Map<String, String> tags = new HashMap<>();
Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
String eventName = eventNameObj != null ? (String)eventNameObj : "";
tags.put(Utils.EVENT_NAME, eventName);
Map<String, Object> fields = new HashMap<>();
fields.put("count", countRows.getCount());
out.collect(new InfluxDBPoint("dropped_events_count", eventTS, tags, fields));//TODO: measurement name
}
});
/* Tried map but doesn't work
reduceStream.map(countRows -> {
logger.info("Inside final map");
Map<String, Object> mapValue = countRows.getRow();
//long currTimestamp = System.currentTimeMillis();
Object eventTSObj = mapValue.get(EVENT_TIMESTAMP);
String eventTimestamp = eventTSObj != null ? (String)eventTSObj : "";
long eventTS = Utils.getLongFromDateStr(eventTimestamp);
Map<String, String> tags = new HashMap<>();
Object eventNameObj = mapValue.get(Utils.EVENT_NAME);
String eventName = eventNameObj != null ? (String)eventNameObj : "";
tags.put(Utils.EVENT_NAME, eventName);
Map<String, Object> fields = new HashMap<>();
fields.put("count", countRows.getCount());
return new InfluxDBPoint("dropped_events_count", eventTS, tags, fields);//TODO: measurement name
});*/
droppedEventsStream.addSink(influxSink);

@@@@@@@@@@@@@@@@@@@@@@@@@@@@@

CountRows是围绕Map<字符串,对象>添加计数:

public static class CountRows implements Serializable, Comparable<CountRows> {
Map<String, Object> row;
Long count;
String eventName;
//default constructor and constructor with 3 attributes

TIA,

有一点很突出,那就是您将timeWindowAllKeyedStream一起使用。API的这一部分不应该这样使用。如果您希望计算所有密钥的全局计数,则删除keyBy;如果要分别计算每个键的事件,请保留keyBy,并使用timeWindow而不是timeWindowAll

我还看到您有事件时间戳,但似乎没有使用事件时间窗口(因为我没有看到时间戳分配器或水印生成器(。我不知道这是故意的,还是可能与结果与你的预期不符有关。

相关内容

  • 没有找到相关文章

最新更新