在hadoop中将json数据保存到HDFS中



我有以下的Reducer类

public static class TokenCounterReducer extends Reducer<Text, Text, Text, Text> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        JSONObject jsn = new JSONObject();
        for (Text value : values) {
            String[] vals = value.toString().split("t");
            String[] targetNodes = vals[0].toString().split(",",-1);
            jsn.put("source",vals[1] );
            jsn.put("target",targetNodes);
        }
        // context.write(key, new Text(sum));
    }
}

通过示例(免责声明:这里是新手),我可以看到一般的输出类型似乎像键/值存储。

但是如果我在输出中没有任何键呢?或者如果我想要输出其他格式(在我的情况下是json)呢?

无论如何,从上面的代码:我想写json对象到HDFS?

这在Hadoop流媒体中是非常微不足道的。但是我如何在Hadoop java中做到这一点?

如果你只是想写一个JSON对象列表到HDFS而不关心键/值的概念,你可以在你的Reducer输出值中使用NullWritable:

public static class TokenCounterReducer extends Reducer<Text, Text, Text, NullWritable> {
    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            JSONObject jsn = new JSONObject();
            ....
            context.write(new Text(jsn.toString()), null);
        }
    }
}

请注意,您需要更改作业配置以执行以下操作:

job.setOutputValueClass(NullWritable.class);

通过将JSON对象写入HDFS,我理解您想要存储我上面描述的JSON的字符串表示。如果您想将JSON的二进制表示存储到HDFS中,则需要使用SequenceFile。显然,你可以为此编写自己的Writable,但我觉得这样更容易,如果你打算有一个简单的字符串表示。

您可以使用Hadoop的OutputFormat接口来创建您的自定义格式,这些格式将按照您的意愿写入数据。例如,如果您需要将数据写入JSON对象,那么您可以这样做:

public class JsonOutputFormat extends TextOutputFormat<Text, IntWritable> {
    @Override
    public RecordWriter<Text, IntWritable> getRecordWriter(
            TaskAttemptContext context) throws IOException, 
                  InterruptedException {
        Configuration conf = context.getConfiguration();
        Path path = getOutputPath(context);
        FileSystem fs = path.getFileSystem(conf);
        FSDataOutputStream out = 
                fs.create(new Path(path,context.getJobName()));
        return new JsonRecordWriter(out);
    }
    private static class JsonRecordWriter extends 
          LineRecordWriter<Text,IntWritable>{
        boolean firstRecord = true;
        @Override
        public synchronized void close(TaskAttemptContext context)
                throws IOException {
            out.writeChar('{');
            super.close(null);
        }
        @Override
        public synchronized void write(Text key, IntWritable value)
                throws IOException {
            if (!firstRecord){
                out.writeChars(",rn");
                firstRecord = false;
            }
            out.writeChars(""" + key.toString() + "":""+
                    value.toString()+""");
        }
        public JsonRecordWriter(DataOutputStream out) 
                throws IOException{
            super(out);
            out.writeChar('}');
        }
    }
}

如果你不想在你的输出中有键,就发出null,像:

context.write(NullWritable.get(), new IntWritable(sum));

HTH

相关内容

  • 没有找到相关文章

最新更新