Do Spark Java API具有类似Hadoop多输出 / fsdataOutputStream的类



我试图在减少零件中输出一些特定记录,这取决于键值记录的值。在Hadoop MapReduce中可以使用

之类的代码
public void setup(Context context) throws IOException, InterruptedException {
  super.setup(context);
  Configuration conf = context.getConfiguration ();
  FileSystem fs = FileSystem.get (conf);
  int taskID = context.getTaskAttemptID().getTaskID().getId();
  hdfsOutWriter = fs.create (new Path (fileName + taskID), true); // FSDataOutputStream
}
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
  boolean isSpecificRecord = false;
  ArrayList <String> valueList = new ArrayList <String> ();
  for (Text val : value) {
    String element = val.toString ();
    if (filterFunction (element)) return;
    if (specificFunction (element)) isSpecificRecord = true;
    valueList.add (element);
  }
  String returnValue = anyFunction (valueList);
  String specificInfo = anyFunction2 (valueList);
  if (isSpecificRecord) hdfsOutWriter.writeBytes (key.toString () + "t" + specificInfo);
  context.write (key, new Text (returnValue));
}

我想在Spark群集上运行此过程,Spark Java API可以像上述代码一样执行此操作?

只是一个想法如何模拟:

yoursRDD.mapPartitions(iter => {
   val fs = FileSystem.get(new Configuration())
   val ds = fs.create(new Path("outfileName_" + TaskContext.get.partitionId))
   ds.writeBytes("Put yours results")
   ds.close()
   iter
})

相关内容

  • 没有找到相关文章

最新更新