我正在维护一个简单的hadoop作业,该作业在HDFS中生成CSV文件作为输出。作业使用TextOutputFormat。我想将前导标题行添加到csv文件中(我知道零件文件是由不同的工人创建的,如果他们每个人都有标题,这不是问题)。如何做到这一点?
编辑:级联可能会有所帮助,但乍一看,我不想开始使用新的框架
编辑:
所以我想为输出的CSV文件添加标题。列数是确定性的。这是我的减速器类骨架:
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
public final class Reducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
private MultipleOutputs<Text, IntWritable> mos;
private static final Text KEY_HOLDER = new Text();
private static final IntWritable VALUE_HOLDER = new IntWritable(1);
@Override
public void setup(final Context context)
{
mos = new MultipleOutputs<Text, IntWritable>(context);
}
@Override
public void cleanup(final Context context) throws IOException, InterruptedException
{
mos.close();
}
@Override
public void reduce(final Text key, final Iterable<IntWritable> values, final Context context)
throws IOException, InterruptedException
{
// [... some business logic ...]
mos.write(KEY_HOLDER, VALUE_HOLDER, "myFileName");
context.progress();
}
}
您可以覆盖mapper/reducer类中的run(),以便根据需要添加标头。例如,如果你想在你的最终o/p中添加FisrtName和LastName,你可以使用下面的代码作为参考。
public void run(Context context) throws IOException, InterruptedException
{
setup(context);
column = new Text("ColumnName") ;
values = new Text("FirstName" + "t" + "LastName") ;
context.write(column, values);
try
{
while (context.nextKey())
{
reduce(context.getCurrentKey(), context.getValues(), context);
Iterator<IntWritable> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator)
{ ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
}
}
}
finally
{
cleanup(context);
}
}