我在hadoop中处理一个输入日志文件,其中的键分布不均匀。这意味着减速器的值分布不均匀。例如,key1的值为1,key2的值为1000。
有没有任何方法可以对与同一个密钥相关的值进行负载平衡[我也不想修改我的密钥]
如果您知道哪些键将具有异常大的值,可以使用以下技巧。
您可以实现一个自定义的Partitioner
,它将确保您的每个倾斜键都指向一个分区,然后其他所有键都将通过它们的hashCode
分配到其余分区(这是默认的HashPartitioner
所做的)。
您可以通过实现以下接口创建自定义Partitioner
:
public interface Partitioner<K, V> extends JobConfigurable {
int getPartition(K key, V value, int numPartitions);
}
然后你可以告诉Hadoop使用你的Partitioner
:
conf.setPartitionerClass(CustomPartitioner.class);
也许您可以在使用减速器之前使用组合器?这是相当投机的。。。
其想法是将每组密钥划分为预设最大大小的分区,然后将这些划分的k/v对输出到reducer。这段代码假设您已经在配置中的某个位置设置了该大小。
public static class myCombiner extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<Text> textList = new ArrayList<Text>();
int part = 0;
while (values.iterator().hasNext()) {
if (textList.size() <= Integer.parseInt(context.getConfiguration().get("yourMaxSize"))) {
textList.add(values.iterator().next());
} else {
for(Text t : textList) {
//essentially partitioning each key...
context.write(new Text(key.toString() + "_" + Integer.toString(part)), t);
}
textList.clear();
}
part += 1;
}
//output any stragglers ...
for(Text t : textList) {
context.write(new Text(key.toString() + "_" + Integer.toString(part)), t);
}
}
}