假设我有一个输入如下:
(1,2)(2,1)(1,3)(3,2)(2,4)(4,1)
预期输出如下:
(1,(2,3,4)) -> (1,3) //second index is total friend #
(2,(1,3,4)) -> (2,3)
(3,(1,2)) -> (3,2)
(4,(1,2)) -> (4,2)
我知道如何在 java 中使用哈希集来做到这一点。但是不知道这如何与mapreduce模型一起工作。任何人都可以在这个问题上提出任何想法或示例代码吗?我会很感激的。
------------------------------------------------------------------------------------
这是我天真的解决方案:1 个映射器,两个减速器。映射器将组织输入(1,2(,(2,1(,(1,3(;
将输出组织为
*(1,hashset<2>(,(2,hashSet<1>(,(1,hashset<2>(,(2,hashset<1>(,(1,hashset<3>(,(3,hashset<1>(.*
减速器1:
将映射器的输出作为输入,输出为:
*(1,hashset<2,3>(, (3,hashset<1>(和 (2,hashset<1>(*
减速器2:
将 reducer1 的输出作为输入,输出为:
*(1,2(、(3,1( 和 (2,1(*
这只是我幼稚的解决方案。我不确定这是否可以通过hadoop的代码来完成。
我认为应该有一个简单的方法来解决这个问题。
Mapper Input: (1,2)(2,1)(1,3)(3,2)(2,4)(4,1)
只需为每对发出两条记录,如下所示:
Mapper Output/ Reducer Input:
Key => Value
1 => 2
2 => 1
2 => 1
1 => 2
1 => 3
3 => 1
3 => 2
2 => 3
2 => 4
4 => 2
4 => 1
1 => 1
在减速器方面,你会得到4个不同的组,如下所示:
Reducer Output:
Key => Values
1 => [2,3,4]
2 => [1,3,4]
3 => [1,2]
4 => [1,2]
现在,您可以根据需要格式化结果。 :)让我知道是否有人可以在这种方法中看到任何问题
1( 介绍/问题
在继续使用作业驱动因素之前,重要的是要了解,在简单的方法中,化简器的值应按升序排序。第一个想法是传递未排序的值列表,并在每个键的化简器中进行一些排序。这有两个缺点:
1( 对于大型值列表,它很可能效率不高
和
2(如果在集群的不同部分处理这些对,框架如何知道(1,4(是否等于(4,1(?
2( 理论解决方案
在Hadoop中做到这一点的方法是通过创建合成密钥来"模拟"框架。
所以我们的地图功能而不是"概念上更合适"(如果我可以这么说的话(
map(k1, v1) -> list(k2, v2)
如下:
map(k1, v1) -> list(ksynthetic, null)
正如你注意到的,我们放弃了值的使用(化简器仍然得到一个null
值的列表,但我们并不真正关心它们(。这里发生的事情是这些值实际上包含在 ksynthetic
.下面是相关问题的示例:
`map(1, 2) -> list([1,2], null)
但是,还需要执行更多操作,以便对键进行适当的分组和分区,并在化简器中实现正确的结果。
3( Hadoop 实现
我们将实现一个名为 FFGroupKeyComparator
的类和 FindFriendPartitioner
的类。
这是我们FFGroupKeyComparator
:
public static class FFGroupComparator extends WritableComparator
{
protected FFGroupComparator()
{
super(Text.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2)
{
Text t1 = (Text) w1;
Text t2 = (Text) w2;
String[] t1Items = t1.toString().split(",");
String[] t2Items = t2.toString().split(",");
String t1Base = t1Items[0];
String t2Base = t2Items[0];
int comp = t1Base.compareTo(t2Base); // We compare using "real" key part of our synthetic key
return comp;
}
}
此类将充当我们的分组比较器类。它控制将哪些键组合在一起以进行对Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)
的单个调用 这非常重要,因为它确保每个化简器获得适当的合成键(从真正的键来看(。
由于Hadoop在具有许多节点的集群中运行,因此确保减少任务的数量与分区一样多非常重要。它们的数量应该与实际密钥(不是合成的(相同。因此,通常我们使用哈希值来执行此操作。在我们的例子中,我们需要做的是根据真实键的哈希值(逗号之前(计算合成键所属的分区。所以我们FindFriendPartitioner
如下:
public static class FindFriendPartitioner extends Partitioner implements Configurable
{
@Override
public int getPartition(Text key, Text NullWritable, int numPartitions)
{
String[] keyItems = key.toString().split(",");
String keyBase = keyItems[0];
int part = keyBase.hashCode() % numPartitions;
return part;
}
所以现在我们都准备好写实际的工作并解决我们的问题。
我假设您的输入文件如下所示:
1,2
2,1
1,3
3,2
2,4
4,1
我们将使用TextInputFormat
.
以下是使用 Hadoop 1.0.4 的作业驱动程序的代码:
public class FindFriendTwo
{
public static class FindFriendMapper extends Mapper<Object, Text, Text, NullWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException
{
context.write(value, new NullWritable() );
String tempStrings[] = value.toString().split(",");
Text value2 = new Text(tempStrings[1] + "," + tempStrings[0]); //reverse relationship
context.write(value2, new NullWritable());
}
}
请注意,我们还在 map
函数中传递了反向关系。
例如,如果输入字符串是 (1,4(,我们一定不要忘记 (4,1(。
public static class FindFriendReducer extends Reducer<Text, NullWritable, IntWritable, IntWritable> {
private Set<String> friendsSet;
public void setup(Context context)
{
friendSet = new LinkedHashSet<String>();
}
public void reduce(Text syntheticKey, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
String tempKeys[] = syntheticKey.toString().split(",");
friendsSet.add(tempKeys[1]);
if( friendsList.size() == 2 )
{
IntWritable key = Integer.parseInt(tempKeys[0]);
IntWritable value = Integer.parseInt(tempKeys[1]);
write(key, value);
}
}
}
最后,我们必须记住在我们的主类中包含以下内容,以便框架使用我们的类。
jobConf.setGroupingComparatorClass(FFGroupComparator.class);
jobConf.setPartitionerClass(FindFriendPartitioner.class);
我会按如下方式处理这个问题。
- 确保我们拥有所有的关系,并且每个关系恰好一次。
- 只需数一数
关于我的笔记:
- 我的键值对表示法是:K -> V
- 键和值几乎总是一个数据结构(不仅仅是字符串或整数(
- 我从不使用密钥来处理数据。关键只是为了控制从映射器到右侧减速器的流量。在所有其他地方,我根本不看钥匙。框架确实在任何地方都需要密钥。使用"(("我的意思是说有一个键我完全忽略了。
- 关于我的 aproach 的关键是它永远不需要同时内存中的"所有朋友"(因此它在非常大的情况下也有效(。
我们从很多开始
(x,y)
而且我们知道数据集中没有所有关系。
映射器:创建所有关系
Input: () -> (x,y)
Output: (x,y) -> (x,y)
(y,x) -> (y,x)
化简器:删除重复项(仅从迭代器输出第一个(
Input: (x,y) -> [(x,y),(x,y),(x,y),(x,y),.... ]
Output: () -> (x,y)
映射器:"字数">
Input: () -> (x,y)
Output: (x) -> (x,1)
减速器:计数它们
Input: (x) -> [(x,1),(x,1),(x,1),(x,1),.... ]
Output: () -> (x,N)
在这么多优秀工程师的帮助下,我终于尝试了这个解决方案。
只有一个映射器和一个化简器。这里没有组合器。
映射器的输入:
1,2
2,1
1,3
3,1
3,2
3,4
5,1
映射器的输出:
1,2
2,1
1,2
2,1
1,3
3,1
1,3
3,1
4,3
3,4
1,5
5,1
减速机输出:
1 3
2 2
3 3
4 1
5 1
第一个是用户,第二个是朋友#。
在化简器阶段,我将hashSet添加到辅助分析中。感谢@Artem齐基里迪斯@Ashish你的回答给了我一个很好的线索。
编辑:
添加的代码:
//映射
public static class TokenizerMapper extends
Mapper<Object, Text, Text, Text> {
private Text word1 = new Text();
private Text word2 = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line,",");
if(itr.hasMoreElements()){
word1.set(itr.nextToken().toLowerCase());
}
if(itr.hasMoreElements()){
word2.set(itr.nextToken().toLowerCase());
}
context.write(word1, word2);
context.write(word2, word1);
//
} }
//还原剂
public static class IntSumReducer extends
Reducer<Text, Text, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
HashSet<Text> set = new HashSet<Text>();
int sum = 0;
for (Text val : values) {
if(!set.contains(val)){
set.add(val);
sum++;
}
}
result.set(sum);
context.write(key, result);
}
}