我正在使用MapSide连接,以便在我的MR作业中获得2个不同的文件。
Input File 1:
0,5.3
26,4.9
54,4.9
.
.
.
InputFile 2:
0 Anju,,3.6,IT,A,1.6,0.3
26 Remya,,3.3,EEE,B,1.6,0.3
54 akhila,,3.3,IT,C,1.3,0.3
我的意图是替换如下
Anju,5.3,3.6,IT,A,1.6,0.3
Remya,4.9,3.3,EEE,B,1.6,0.3
akhila,4.9,3.3,IT,C,1.3,0.3
我所做的是
为了得到2个文件作为输入,我使用了2个映射器(MultipleInput
)。
2个文件的第一列是文件偏移量
在第一个映射中,我发出第一个col作为键(可写入偏移量),其余作为第一个文件的值在第2个映射中,我也发出了第一个col作为键(可写入的偏移量),其余作为第2个文件
的值。在reducer中我可以得到
Reducer
key 0
value 5.3
value Anju,S,3.6,IT,A,1.6,0.3
Reducer
key 26
value Remya,,3.3,EEE,B,1.6,0.3
value 4.9
Reducer
key 54
value 4.9
value akhila,,3.3,IT,C,1.3,0.3
我将如何replace
价值任何想法?
我的方法是否正确,或者我是否应该遵循其他方法?请建议。
您可以使用以下代码进行替换:
String result = null;
String replacement = null;
for (Text value: values) {
String valueStr = value.toString();
if (valueStr.contains(",,")) {
result = valueStr;
} else {
replacement = valueStr;
}
}
if (result == null || replacement == null) {
return;
}
result = result.replaceFirst(",,", "," + replacement + ",");
// write result
但它不是MapSide连接。要执行MapSide连接,您应该在每个映射器(在设置阶段)中读取替换文件(InputFile 1
),然后在映射阶段将该数据与InputFile 2
连接。例子:
private Map < Integer, Double > replacements;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
replacements = new HashMap < Integer, Double > ();
// read FileInput 1 to replacements
// ...
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] parts = value.toString().split("t"); // 0 Anju,,3.6,IT,A,1.6,0.3
Integer id = Integer.parseInt(parts[0]);
if (!replacements.containsKey(id)) {
return; // can't replace
}
Double replacement = replacements.get(id);
String result = parts[1].replaceFirst(",,", "," + replacement + ",");
// write result to context
}