我正在从文件中获取一些JSON记录。我想解析JSON,然后基于JSON中的字段,更新铲斗功能基本路径。
对于例如:JSON记录中有一个字段名称'用户ID',并基于我想将基本路径更新为bucketingsink("/data/data/app/users/" user-ders-id-field-value "/")
我该怎么做?
代码: dataStream input = env.ReadTextFile("/home/home/user/desktop/jsonfile");
DataStream<String> parsedJson = input.map((inputMsg)->{
String json="";
try{
json=jsonParser.parse(inputMsg).getAsString();
}catch (Exception e){
e.printStackTrace();
}
return json;
});
parsedJson.addSink(new BucketingSink<>(""));
}
使用bucketingsink.setbucketer()方法设置创建的类,该类实现了扣子接口,并将user-id
字段值用作sub-bucket路径。