我在使用来自 AWS Kinesis 的 json 消息时尝试使用 EventTimeSessionWindows。
到目前为止,我拥有的:
DataStream<SamplePojo> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"my-stream",
new POJODeserializationSchema(),
kinesisConsumerConfig));
DataStream<SamplePojo> aggregated = kinesis
.keyBy("someProperty1")
.window(EventTimeSessionWindows.withGap(Time.seconds(2L)))
.sum("indicator");
//kinesis.print();
aggregated.print();
env.execute();
POJODeserializationSchema 就像在 Apache Flink 中一样 - 如何使用 AWS Kinesis 发送和使用 POJO
这依赖于接收 Tuple3 的文档的基本示例:
DataStream<Tuple3<String, Long, Integer>> aggregated = source
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
.sum(2);
但aggregated
似乎是空的..有什么想法吗?
(kinesis.print()
确实显示了所有投入 Kinesis 的消息(
您必须为流提供时间戳和水印,如下所示。
像这样:
DataStream<Tuple3<String, Long, Integer>> aggregated = source
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SamplePojo>() {...})
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
.sum(2);
另请注意,您必须启用TimeCharacterist.EventTime
:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
让它与:
DataStream<SamplePojo> aggregated = kinesis.assignTimestampsAndWatermarks((new AscendingTimestampExtractor<SamplePojo>() {
@Override
public long extractAscendingTimestamp(SamplePojo samplePojo) {
return samplePojo.getSomeProperty2();
}
}));
aggregated
.keyBy((event) -> event.getSomeProperty1())
.timeWindow(Time.seconds(1));
aggregated.print();
感谢Dawid Wysakowicz的有用链接