未在Flink Richmapfunction中调用开放方法



我正在尝试将Apache Flink用于快捷方式描述的简单示例。但是,我注意到开放方法从未被调用,因此我在映射功能的第一行中得到了null指针例外。

public class MyMap extends RichMapFunction<Integer, Integer> {
    private ValueState<Integer> test;
    public void open(Configuration cfg) {
        test = getRuntimeContext().getState(new 
                ValueStateDescriptor<Integer>("myTest", Integer.class));
        System.out.println("1:" + test);
    }

    @Override
    public Integer map(Integer i) throws Exception {
        System.out.println("2:" + test.value()); //test is null here
        test.update(test.value() == null? 1: test.value() + 1);
        System.out.println("3:" + test.value());
        return i;
    }
}

更新:

您是否尝试过@override?

test test.value应该是null。您处于钥匙上下文,这意味着每个消息都有一个键已经知道的键。当您输入状态运营商时,Flink将尝试从配置的状态后端获取该键的值。除非您将ValueStateDescriptor配置为具有默认值(已弃用(,否则您第一次处理特定键的消息,该状态将为null。因此,您的应用程序应处理null值。

尝试以下示例(我的java生锈了,这在Scala中。问我是否需要帮助它(:

env.fromElements(("key1", 2),("key2", 4), ("key1", 5))
  .keyBy(_._1)
  .map {
    new RichMapFunction[(String, Int), (String, Int)] {
      lazy val stateTypeInfo: TypeInformation[Int] = implicitly[TypeInformation[Int]]
      lazy val serializer: TypeSerializer[Int] = stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
      lazy val stateDescriptor = new ValueStateDescriptor[Int]("dummy state", serializer)
      var testVar: ValueState[Int] = _
      override def open(config: Configuration) = {
        testVar = this.getRuntimeContext.getState(stateDescriptor)
      }
      override def map(in: (String, Int)): (String, Int) = {
        println(s"message $in")
        println(s"state ${testVar.value()}")
        println()
        val sum = Option(testVar.value()).getOrElse(0) + in._2
        testVar.update(sum)
        (in._1, sum)
      }
  }
}.print()
env.execute() 

这应该产生:

message (key1,2) (first time key1 is seen)
state null       (state is null)
(key1,2)  (output)
message (key2,4) (first time key2 is seen)
state null       (state is null)
(key2,4)  (output)
message (key1,5) (second time key1 is seen!! We stored something there!)
state 2 (we stored a 2)
(key1,7) (thus output is 2+5=7)

我也有类似的问题。我可以通过更换以下导入来解决问题:

import java.lang.module.Configuration;

与此:

import org.apache.flink.configuration.Configuration;

相关内容

  • 没有找到相关文章

最新更新