如何使用Flink SQL按事件时间对流进行分类



我有一个我想排序的排序 DataStream<Event>,以便通过其事件时间时间戳来订购事件。我将用例简化为事件类只有一个字段的位置-timestamp字段:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.setParallelism(1);
    DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
            .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
    Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
    tableEnv.registerTable("events", events);
    Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
    DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
    sortedEventStream.print();
    env.execute();
}

我遇到了这个错误:

线程"主"中的例外 org.apache.flink.table.api.sqlparserexception:SQL Parse失败。 遇到了第1列第1行的"时间戳"。

似乎我没有以正确的方式指定事件时间属性,但尚不清楚怎么了。

问题是将timestamp用作我的事件类中的字段名称。将其更改为eventTime足以使所有内容都有效:

public class Sort {
    public static final int OUT_OF_ORDERNESS = 1000;
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
                .assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
        Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
        tableEnv.registerTable("events", events);
        Table sorted = tableEnv.sqlQuery("SELECT eventTime FROM events ORDER BY eventTime ASC");
        DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
        sortedEventStream.print();
        env.execute();
    }
    public static class Event {
        public Long eventTime;
        Event() {
            this.eventTime = Instant.now().toEpochMilli() + (new Random().nextInt(OUT_OF_ORDERNESS));
        }
    }
    private static class OutOfOrderEventSource implements SourceFunction<Event> {
        private volatile boolean running = true;
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            while(running) {
                ctx.collect(new Event());
                Thread.sleep(1);
            }
        }
        @Override
        public void cancel() {
            running = false;
        }
    }
    private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
        public TimestampsAndWatermarks() {
            super(Time.milliseconds(OUT_OF_ORDERNESS));
        }
        @Override
        public long extractTimestamp(Event event) {
            return event.eventTime;
        }
    }
}

相关内容

  • 没有找到相关文章

最新更新