KeyedProcesfunction实现抛出Null指针异常



第一个示例:来自" https://ci.apache.org/projects/flink/flink/flink/flink/flink-docs-stable/dev/dev/stream/process_function.html"/pprocess_html"/pp(>

我正在尝试覆盖KeyedProcessFunction类的ProcessElement((。ProcessElement有3个参数,一个参数之一是上下文对象。当我尝试从上下文对象检索时间戳时,它会引发NULL指针异常。

在第一个示例代码中抛出Null指针异常的行是

current.lastModified = ctx.timestamp((;

第二个示例:" apache flink的流程处理"书6.5。

我有两个在扩展KeyedProcessfunction类的类中声明的两个估值变量。当我尝试检索状态更新的最后一个值时,它返回了null值。

在第一个示例代码中抛出Null指针异常的行是

double prevtemp = lastTemp.value((;if(prevTemp == 0.0 || r.temperature< prevTemp({}

第一个示例代码

public class KeyedProcessFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=
                StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<Tuple2<String, String>> stream =
                environment.socketTextStream("localhost",9090)
                        .map(new MapFunction<String, Tuple2<String, String>>() {
                            @Override
                            public Tuple2<String, String> map(String s) throws Exception {
                                String[] words= s.split(",");
                                return new Tuple2<>(words[0],words[1]);
                            }
                        });
        DataStream<Tuple2<String, Long>> result = stream
                .keyBy(0)
                .process(new CountWithTimeoutFunction());
        result.print();
        environment.execute("Keyed Process Function Example");
    }
    public static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {
        private ValueState<CountWithTimestamp> state;
        @Override
        public void open(Configuration parameters) throws Exception {
            state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
        }
        @Override
        public void processElement(
                Tuple2<String, String> value,
                Context ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // retrieve the current count
            CountWithTimestamp current = state.value();
            if (current == null) {
                current = new CountWithTimestamp();
                current.key = value.f0;
            }
            // update the state's count
            current.count++;
            // set the state's timestamp to the record's assigned event time timestamp
            current.lastModified = ctx.timestamp();
            // write the state back
            state.update(current);
            // schedule the next timer 60 seconds from the current event time
            ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
        }
        @Override
        public void onTimer(
                long timestamp,
                OnTimerContext ctx,
                Collector<Tuple2<String, Long>> out) throws Exception {
            // get the state for the key that scheduled the timer
            CountWithTimestamp result = state.value();
            // check if this is an outdated timer or the latest timer
            if (timestamp == result.lastModified + 60000) {
                // emit the state on timeout
                out.collect(new Tuple2<String, Long>(result.key, result.count));
            }
        }
    }
}
class CountWithTimestamp {
    public String key;
    public long count;
    public long lastModified;
}

第二个示例

public class KeyedProcessFunctionTimerExample {
    public static void main(String[] args) throws Exception{
        // set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // use event time for the application
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStream<String> sensorData=
                env.addSource(new SensorSource())
                .keyBy(r -> r.id)
                .process(new TempIncreaseAlertFunction());
        sensorData.print();
        env.execute("Keyed Process Function execution");
    }
    public static class TempIncreaseAlertFunction extends KeyedProcessFunction<String, SensorReading, String> {
        private ValueState<Double> lastTemp;
        private ValueState<Long> currentTimer;
        @Override
        public void open(Configuration parameters) throws Exception {
            lastTemp = getRuntimeContext().getState(new ValueStateDescriptor<Double>("lastTemp", Types.DOUBLE));
            currentTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", org.apache.flink.api.common.typeinfo.Types.LONG));
        }
        @Override
        public void processElement(
                SensorReading r,
                Context ctx,
                Collector<String> out) throws Exception {
            // get previous Temp
            Double prevTemp = lastTemp.value();
            // update last temp
            lastTemp.update(r.temperature);
            Long curTimerTimestamp = currentTimer.value();
            if(prevTemp==0.0 || r.temperature < prevTemp) {
                ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);
                currentTimer.clear();
            }
            else if(r.temperature > prevTemp && curTimerTimestamp == 0) {
                Long timerTs = ctx.timerService().currentProcessingTime() + 1000;
                ctx.timerService().registerProcessingTimeTimer(timerTs);
                currentTimer.update(timerTs);
            }
        }
        @Override
        public void onTimer(
                long ts,
                OnTimerContext ctx,
                Collector<String> out) throws Exception {
            out.collect("Temperature of sensor ' " + ctx.getCurrentKey() + " ' monotonically increased for 1 second.");
            currentTimer.clear();
        }
    }
}

它不应抛出Null指针异常。您的帮助将不胜感激。谢谢!

在Flink中使用事件时间时,您必须安排活动的时间戳,并使溪流具有水印。您可以通过实现时间戳提取器和水印发电机来做到这一点。

另请参见教程。

相关内容

  • 没有找到相关文章

最新更新