Apache Flink - 使用 EventTimeSessionWindows 处理来自 Kinesis 的 poj



我在使用来自 AWS Kinesis 的 json 消息时尝试使用 EventTimeSessionWindows。
到目前为止,我拥有的:

DataStream<SamplePojo> kinesis = env.addSource(new FlinkKinesisConsumer<>(
            "my-stream",
            new POJODeserializationSchema(),
            kinesisConsumerConfig));

DataStream<SamplePojo> aggregated = kinesis
            .keyBy("someProperty1")
            .window(EventTimeSessionWindows.withGap(Time.seconds(2L)))
            .sum("indicator");

//kinesis.print();
aggregated.print();
env.execute(); 

POJODeserializationSchema 就像在 Apache Flink 中一样 - 如何使用 AWS Kinesis 发送和使用 POJO

这依赖于接收 Tuple3 的文档的基本示例:

DataStream<Tuple3<String, Long, Integer>> aggregated = source
        .keyBy(0)
        .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
        .sum(2);

aggregated似乎是空的..有什么想法吗?
(kinesis.print()确实显示了所有投入 Kinesis 的消息(

您必须为流提供时间戳和水印,如下所示。

像这样:

DataStream<Tuple3<String, Long, Integer>> aggregated = source
    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SamplePojo>() {...})
    .keyBy(0)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
    .sum(2);

另请注意,您必须启用TimeCharacterist.EventTime

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

让它与:

 DataStream<SamplePojo> aggregated = kinesis.assignTimestampsAndWatermarks((new AscendingTimestampExtractor<SamplePojo>() {
                @Override
                public long extractAscendingTimestamp(SamplePojo samplePojo) {
                    return samplePojo.getSomeProperty2();
                }
            }));
 aggregated
       .keyBy((event) -> event.getSomeProperty1())
       .timeWindow(Time.seconds(1));
 aggregated.print();

感谢Dawid Wysakowicz的有用链接

相关内容

  • 没有找到相关文章

最新更新