如何为bucketingsink函数flink设置动态基本路径



我正在从文件中获取一些JSON记录。我想解析JSON,然后基于JSON中的字段,更新铲斗功能基本路径。

对于例如:JSON记录中有一个字段名称'用户ID',并基于我想将基本路径更新为bucketingsink("/data/data/app/users/" user-ders-id-field-value "/")

我该怎么做?

代码: dataStream input = env.ReadTextFile("/home/home/user/desktop/jsonfile");

    DataStream<String> parsedJson = input.map((inputMsg)->{
        String json="";
        try{
            json=jsonParser.parse(inputMsg).getAsString();
        }catch (Exception e){
            e.printStackTrace();
        }
        return json;
    });
   parsedJson.addSink(new BucketingSink<>(""));
}

使用bucketingsink.setbucketer()方法设置创建的类,该类实现了扣子接口,并将user-id字段值用作sub-bucket路径。

相关内容

  • 没有找到相关文章

最新更新