我是Flink的新手,我已经开始了一个项目,我必须创建窗口函数;我的主代码是这样的:
public class Main {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9093");
properties.setProperty("group.id", "test");
Configuration conf = new Configuration();
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
conf.setInteger(RestOptions.PORT, 8082);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
//configuration hashmap
HashMap<String,Integer> config = new HashMap<>();
/**added data to config hashmap**/
config.put("620467-DTC",514);
config.put("383069-DCDC_1",64);
System.out.println(config.toString());
SingleOutputStreamOperator<MetricObject> stream = env
.addSource(new FlinkKafkaConsumer<>("input_topic", new JsonDeserializationSchema(), properties))
.flatMap(new MetricFilter(config));
SingleOutputStreamOperator<Tuple2<String, Long>> windowedStream = stream
.assignTimestampsAndWatermarks(new RecordWatermark().withTimestampAssigner(new ExtractRecordTimestamp()))
.keyBy(new MetricGrouper())
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new WindowedCount());
windowedStream.print();
RecordWatermark类的实现方式如下:
public class RecordWatermark implements WatermarkStrategy<MetricObject> {
@Override
public TimestampAssigner<MetricObject> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return WatermarkStrategy.super.createTimestampAssigner(context);
}
@Override
public WatermarkStrategy<MetricObject> withTimestampAssigner(SerializableTimestampAssigner<MetricObject> timestampAssigner) {
return WatermarkStrategy.super.withTimestampAssigner(timestampAssigner);
}
@Override
public WatermarkGenerator<MetricObject> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new MetricWatermarksGenerator();
}
和extracRecordTimestamp类有一个方法extractTimestamp,允许直接从记录中提取时间戳值:
public class ExtractRecordTimestamp implements SerializableTimestampAssigner<MetricObject> {
@Override
public long extractTimestamp(MetricObject element, long recordTimestamp) {
Instant timestamp = element.getTimestamp();
log.info("Event time: {}", DateTimeFormatter.ISO_INSTANT.format(timestamp));
return timestamp.toEpochMilli();
}
关于水印的创建,我已经实现了一个类来实现WatermarkGenerator接口:
private final long maxOutOfOrderness = 5000; // 5 seconds
private long currentMaxTimestamp;
private long currentTime;
@Override
public void onEvent(MetricObject event, long eventTimestamp, WatermarkOutput output) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
currentTime = System.nanoTime();
final String eventTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(eventTimestamp));
final String currentMaxTsString = DateTimeFormatter.ISO_INSTANT.format(Instant.ofEpochMilli(currentMaxTimestamp));
log.info("Event timestamp: {}, maxTimestamp: {}", eventTsString, currentMaxTsString);
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
// emit the watermark as current highest timestamp minus the out-of-orderness bound
output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
}
我用一个简单的java生产者生成消息到kafka主题,使用Docker,这样我就可以运行kafka实例。这段代码的问题是窗口不起作用,计数函数没有处理任何记录。我尝试在相关的类中放入一些日志,下面是输出的示例:
19:29:27.425 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:27.872 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:52Z'
19:29:27.882 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:52Z
19:29:27.902 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:52Z, maxTimestamp: 2021-04-21T16:25:52Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:55Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:28.302 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:55Z'
19:29:29.304 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:58Z
19:29:29.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:58Z'
19:29:29.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:58Z
19:29:29.312 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:58Z, maxTimestamp: 2021-04-21T16:25:58Z
19:29:30.306 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:30.309 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:32:04.027202Z
19:29:30.310 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:26:00Z
19:29:30.311 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:00Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:30.545 [Window(SlidingEventTimeWindows(10000, 5000), EventTimeTrigger, WindowedCount) -> Sink: Print to Std. Out (2/4)#0] INFO WindowedCount - watermark:2021-04-21T16:25:54.999Z
2> (620467,1)
19:29:31.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:26Z
19:29:31.319 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:59.596508Z
19:29:31.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:26Z'
19:29:32.316 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:25:50Z
19:29:32.320 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:55.228960Z
19:29:32.325 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:25:50Z'
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:25:50Z
19:29:32.326 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:25:50Z, maxTimestamp: 2021-04-21T16:26:00Z
19:29:33.323 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:00Z
19:29:33.327 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:46.161843Z
19:29:33.331 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:00Z'
19:29:34.329 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:27:57Z
19:29:34.332 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:41.633868Z
19:29:34.336 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:27:57Z'
19:29:35.337 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] TRACE InstantTypeAdapter - Deserializing instant: 2021-05-19T15:31:37.399379Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricFilter - Filtering event with timestamp '2021-04-21T16:26:01Z'
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO ExtractRecordTimestamp - Event time: 2021-04-21T16:26:01Z
19:29:35.342 [Legacy Source Thread - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (3/4)#0] INFO MetricWatermarksGenerator - Event timestamp: 2021-04-21T16:26:01Z, maxTimestamp: 2021-04-21T16:26:01Z
似乎只处理第一个记录,而不考虑下一个记录。我使用的输入是这些记录:
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:52Z":{"16":{"DTC":{....
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:55Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:25:58Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":620467,"samples":{"2021-04-21T16:26:00Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:25:26Z":{"16":{"DTC":{"513":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:25:50Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":610760,"samples":{"2021-04-21T16:26:00Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":620685,"samples":{"2021-04-21T16:27:57Z":{"6":{"DCDC_1":...
{"arguments":{"rootAssetId":383069,"samples":{"2021-04-21T16:26:01Z":{"6":{"DCDC_1":...
,其中时间戳是应该使用的有效事件时间。对这些记录进行过滤,以便只有rootAssetId为620467和383069的记录(逻辑不重要)。
当Flink的事件时间窗口没有产生结果时,要么是因为
- 窗口为空,或者
- 没有生成足够大的水印。
在这种情况下,原因#2适用。
您提供的数据被分配到两个重叠的窗口,一个用于间隔16:20:00到16:29:59.999,另一个用于间隔16:25:00到16:34:59.999。要触发以16:29:59.999结束的窗口,必须出现至少16:29:59.999的水印,表明流现在已经完成了该时间戳。
对于由水印策略生成的这样一个水印,时间戳为16:30:05(或更大)的事件必须出现在输入中。这似乎不可能发生。
另一种使窗口关闭的方法是将输入流限定。每当有界源到达终点时,源就会生成MAX_WATERMARK的最后一个水印(在作业关闭之前)——这将关闭所有剩余的事件时间窗口。您可以在测试中使用Flink 1.14中引入的新KafkaSource
及其setBounded选项,或者通过实现在某些时候从isEndOfStream
方法返回true的反序列化器来实现这一点。
可以简化你的水印代码。我相信你写的内容相当于这样,这样更容易理解和维护:
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<MetricObject>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) ->
event.getTimestamp().toEpochMilli()));