时间戳和水印的时间尺度不同



我有一个传感器数据流,从now()开始,每秒发出数据,但它们的时间戳增加了15分钟。比方说现在是19:00:00我们输入

('TH1', '2023-01-17 19:00', 15.559) at 19:00:00 
('TH1', '2023-01-17 19:15', 12.008) at 19:00:01 
('TH1', '2023-01-17 19:30', 15.706) at 19:00:02 

等。由于我知道延迟数据将以x模拟天的BoundedOutOfOrderness到达,即实时24*(60/15)*x秒,由于未来的时间戳,我正在努力实现水印策略和TimestampAssigner。目标是将整个模拟日(即96秒)的事件相加。

到目前为止,我已经尝试用TumblingProcessingTimeWindows触发聚合器,但我认为用EventTime会更好。BoundedOutOfOrderness和TumblingEvent窗口的一些组合似乎不起作用。我是flink的新手,我正在尝试1.6。据我所知,没有太多的代码,因为这个水印的东西是这个新版本。

下面是我认为可以工作的代码

KafkaSource<SensorMessage> consumer = KafkaSource.<SensorMessage>builder()
.setBootstrapServers(parameterTool.get("bootstrap.servers"))
.setTopics(String.format("sensors.%s", topic))
.setGroupId(parameterTool.get("group.id"))
.setValueOnlyDeserializer(new SensorMessageDeserializationSchema()).build();
SingleOutputStreamOperator<SensorMessage> soso = env.fromSource(consumer, 
WatermarkStrategy.
<SensorMessage>forBoundedOutOfOrderness(Duration.ofSeconds(3*96)) // 3 days
.withTimestampAssigner(((sensorMessage, l) -> sensorMessage.timestamp)), 
topic);
soso.keyBy(s -> s.name)
.window(TumblingEventTimeWindows.of(Time.seconds(96)))// sum 1 day
.sum("value")
.map(s -> s.name + ", " + s.value)
.sinkTo(KafkaSink.<String>builder()
.setBootstrapServers(
parameterTool
.getProperties()
.getProperty("bootstrap.servers"))
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("flink.output")
.setValueSerializationSchema(new SimpleStringSchema())
.build())
.build());

我想你把事情搞混了。

ProcessingTime是由系统时钟测量的时间,因此在您的情况下,1个模拟日将有96秒。

EventTime另一方面不使用系统时钟,所有的时间流是衡量基于timestamp你收到的事件,这意味着即使你接受一整天的模拟在96秒,从事件时间的角度来看,整个天已经过去了,所以在定义秩序和窗口大小,您需要定义事件时间的角度如。你想要一个24小时的窗口大小不是96秒。

最新更新