如何配置数据流程序中的用户功能



我创建了一个带有配置的流环境,并尝试在RichMapFunctionopen()方法中访问该配置。

的例子:

    Configuration conf = new Configuration();
    conf.setBoolean("a", true);
    StreamExecutionEnvironment env = 
        StreamExecutionEnvironment.createLocalEnvironment(8, conf);
    DataStreamSource<Integer> source = env.fromElements(5,5,5,5,5);
    source.map(new RichMapFunction<Integer, Integer>() {
        @Override
        public void open(Configuration parameters) throws Exception {
            boolean a = parameters.getBoolean("a", false);
            super.open(parameters);
        }
        @Override
        public Integer map(Integer value) throws Exception {
            return value;
        }
    }).print();
    env.execute();

调试open()方法时,发现配置为空。

我做错了什么?如何在流环境中将配置正确地传递给RichFunction ?

Flink的DataStream和DataSet API共享相同的用户函数接口,如您的示例中的RichMapFunction

Flink的RichFunctionopen方法的Configuration参数是从DataSet API的第一个版本遗留下来的,没有在DataStream API中使用。Flink序列化您在map()调用中提供的对象,并将其发送给并行工作程序。因此,您可以直接在对象中设置参数作为常规字段。

相关内容

  • 没有找到相关文章

最新更新