我在HDFS中存储了一组庞大的数据,我们想将其索引到Elasticsearch
中。琐碎的想法是使用Elasticsearch-hadoop
库。
我遵循了这个视频中的概念,这是我为这份工作编写的代码。
public class TestOneFileJob extends Configured implements Tool {
public static class Tokenizer extends MapReduceBase
implements Mapper<LongWritable, Text, LongWritable, MapWritable> {
private final MapWritable map = new MapWritable();
private final Text key = new Text("test");
@Override
public void map(LongWritable arg0, Text value, OutputCollector<LongWritable, MapWritable> output, Reporter reporter)
throws IOException {
map.put(key, value);
output.collect(arg0, map);
}
}
@Override
public int run(String[] args) throws Exception {
JobConf job = new JobConf(getConf(), TestOneFileJob.class);
job.setJobName("demo.mapreduce");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(EsOutputFormat.class);
job.setMapperClass(Tokenizer.class);
job.setMapOutputValueClass(MapWritable.class);
job.setSpeculativeExecution(false);
FileInputFormat.setInputPaths(job, new Path(args[1]));
job.set("es.resource", args[2]);
JobClient.runJob(job);
return 0;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new TestOneFileJob(), args));
}
}
作业运行良好,但整个json被放入Elasticsearch
中一个名为test
的字段中。很明显,字段名称是这行private final Text key = new Text("test");
中的关键,但我需要整个json字段。
以下是文档在Elasticsearch中的显示方式。
{
"_index": "test",
"_type": "test",
"_id": "AVEzNbg4XbZ07JYtWKzv",
"_score": 1,
"_source": {
"test": "{"id":"tag:search.twitter.com,2005:666560492832362496","objectType":"activity","actor":{"objectType":"person","id":"id:twitter.com:2305228178","link":"http://www.twitter.com/alert01","displayName":"Himanshu","postedTime":"2014-01-22T17:49:57.000Z","image":"https://pbs.twimg.com/profile_images/468092875440275456/jJkHRnQF_normal.jpeg","summary":"A Proud Indian ; A Nationalist ; Believe in India First","links":[{"href":null,"rel":"me"}],"friendsCount":385,"followersCount":2000,"listedCount":83,"statusesCount":103117,"twitterTimeZone":"New Delhi","verified":false,"utcOffset":"19800","preferredUsername":"alert01","languages":["en-gb"],"favoritesCount":10},"verb":"share","postedTime":"2015-11-17T10:16:20.000Z","generator":{"displayName":"Twitter for Android","link":"http://twitter.com/download/android"},"provider":{"objectType":"service","displayName":"Twitter","link":"http://www.twitter.com"},"link":"http://twitter.com/alert01/statuses/666560492832362496","body":"RT @UnSubtleDesi: Raje didnt break rules bt Media hounded her for weeks demndng resignatn on \"moral ground\".A massve dynasty scam unfoldng …","object":{"id":"tag:search.twitter.com,2005:666559923673653248","objectType":"activity","actor":{"objectType":"person","id":"id:twitter.com:17741799","link":"http://www.twitter.com/UnSubtleDesi","displayName":"Vande Mataram","postedTime":"2008-11-29T21:12:05.000Z","image":"https://pbs.twimg.com/profile_images/648362451717648384/-7oGuhfN_normal.jpg","summary":"I apologise if I end up offending u unintentionally. In all probability, it was acutely intentional. http://saffronscarf.blogspot.in","links":[{"href":null,"rel":"me"}],"friendsCount":786,"followersCount":25198,"listedCount":155,"statusesCount":71853,"twitterTimeZone":null,"verified":false,"utcOffset":null,"preferredUsername":"UnSubtleDesi","languages":["en"],"favoritesCount":21336},"verb":"post","postedTime":"2015-11-17T10:14:04.000Z","generator":{"displayName":"Twitter for Android","link":"http://twitter.com/download/android"},"provider":{"objectType":"service","displayName":"Twitter","link":"http://www.twitter.com"},"link":"http://twitter.com/UnSubtleDesi/statuses/666559923673653248","body":"Raje didnt break rules bt Media hounded her for weeks demndng resignatn on \"moral ground\".A massve dynasty scam unfoldng here. Eerie silence","object":{"objectType":"note","id":"object:search.twitter.com,2005:666559923673653248","summary":"Raje didnt break rules bt Media hounded her for weeks demndng resignatn on \"moral ground\".A massve dynasty scam unfoldng here. Eerie silence","link":"http://twitter.com/UnSubtleDesi/statuses/666559923673653248","postedTime":"2015-11-17T10:14:04.000Z"},"inReplyTo":{"link":"http://twitter.com/UnSubtleDesi/statuses/666554154169446400"},"favoritesCount":5,"twitter_entities":{"hashtags":[],"urls":[],"user_mentions":[],"symbols":[]},"twitter_filter_level":"low","twitter_lang":"en"},"favoritesCount":0,"twitter_entities":{"hashtags":[],"urls":[],"user_mentions":[{"screen_name":"UnSubtleDesi","name":"Vande Mataram","id":17741799,"id_str":"17741799","indices":[3,16]}],"symbols":[]},"twitter_filter_level":"low","twitter_lang":"en","retweetCount":9,"gnip":{"matching_rules":[{"tag":"ISIS40"}],"klout_score":54,"language":{"value":"en"}}}"
}
}
一种选择是手动解析json,并为json中的每个键分配字段。
还有其他选择吗?
您需要将es.input.json设置为true。
job.set("es.input.json","true");
job.setMapOutputValueClass(Text.class);
这将告诉elasticsearch hadoop数据已经是json格式的。然后你的映射器输出应该看起来像
output.collect(NullWritable.get(), value);
其中value是json字符串。