查找所有用户的朋友#:如何使用Hadoop Mapreduce实现



假设我有一个输入如下:

(1,2)(2,1)(1,3)(3,2)(2,4)(4,1) 

预期输出如下:

(1,(2,3,4)) -> (1,3) //second index is total friend #
(2,(1,3,4)) -> (2,3)
(3,(1,2))   -> (3,2)
(4,(1,2))   -> (4,2)

我知道如何在 java 中使用哈希集来做到这一点。但是不知道这如何与mapreduce模型一起工作。任何人都可以在这个问题上提出任何想法或示例代码吗?我会很感激的。

------------------------------------------------------------------------------------

这是我天真的解决方案:1 个映射器,两个减速器。映射器将组织输入(1,2(,(2,1(,(1,3(;

将输出组织为

*(1,hashset<2>(,(2,hashSet<1>(,(1,hashset<2>(,(2,hashset<1>(,(1,hashset<3>(,(3,hashset<1>(.*

减速器1

将映射器的输出作为输入,输出为:

*(1,hashset<2,3>(, (3,hashset<1>(和 (2,hashset<1>(*

减速器2

将 reducer1 的输出作为输入,输出为:

*(1,2(、(3,1( 和 (2,1(*

这只是我幼稚的解决方案。我不确定这是否可以通过hadoop的代码来完成。

我认为应该有一个简单的方法来解决这个问题。

Mapper Input: (1,2)(2,1)(1,3)(3,2)(2,4)(4,1)

只需为每对发出两条记录,如下所示:

Mapper Output/ Reducer Input:
Key => Value
1 => 2
2 => 1
2 => 1
1 => 2
1 => 3
3 => 1
3 => 2
2 => 3
2 => 4
4 => 2
4 => 1
1 => 1

在减速器方面,你会得到4个不同的组,如下所示:

Reducer Output:
Key => Values
1 => [2,3,4]
2 => [1,3,4]
3 => [1,2]
4 => [1,2]

现在,您可以根据需要格式化结果。 :)让我知道是否有人可以在这种方法中看到任何问题

1( 介绍/问题

继续使用作业驱动因素之前,重要的是要了解,在简单的方法中,化简器的值应按升序排序。第一个想法是传递未排序的值列表,并在每个键的化简器中进行一些排序。这有两个缺点:

1( 对于大型值列表,它很可能效率不高

2(如果在集群的不同部分处理这些对,框架如何知道(1,4(是否等于(4,1(?

2( 理论解决方案

在Hadoop中做到这一点的方法是通过创建合成密钥来"模拟"框架。

所以我们的地图功能而不是"概念上更合适"(如果我可以这么说的话(

map(k1, v1) -> list(k2, v2)

如下:

map(k1, v1) -> list(ksynthetic, null)

正如你注意到的,我们放弃了值的使用(化简器仍然得到一个null值的列表,但我们并不真正关心它们(。这里发生的事情是这些值实际上包含在 ksynthetic .下面是相关问题的示例:

`map(1, 2) -> list([1,2], null)

但是,还需要执行更多操作,以便对键进行适当的分组和分区,并在化简器中实现正确的结果。

3( Hadoop 实现

我们将实现一个名为 FFGroupKeyComparator 的类和 FindFriendPartitioner 的类。

这是我们FFGroupKeyComparator

public static class FFGroupComparator extends WritableComparator
{
    protected FFGroupComparator() 
    {
        super(Text.class, true);
    }
    @Override
    public int compare(WritableComparable w1, WritableComparable w2)
    {
        Text t1 = (Text) w1;
        Text t2 = (Text) w2;
        String[] t1Items = t1.toString().split(",");
        String[] t2Items = t2.toString().split(",");
        String t1Base = t1Items[0];
        String t2Base = t2Items[0];
        int comp = t1Base.compareTo(t2Base); // We compare using "real" key part of our synthetic key
        return comp;
    }
}  

此类将充当我们的分组比较器类。它控制将哪些键组合在一起以进行对Reducer.reduce(Object, Iterable, org.apache.hadoop.mapreduce.Reducer.Context)的单个调用 这非常重要,因为它确保每个化简器获得适当的合成键(从真正的键来看(。

由于Hadoop在具有许多节点的集群中运行,因此确保减少任务的数量与分区一样多非常重要。它们的数量应该与实际密钥(不是合成的(相同。因此,通常我们使用哈希值来执行此操作。在我们的例子中,我们需要做的是根据真实键的哈希值(逗号之前(计算合成键所属的分区。所以我们FindFriendPartitioner如下:

public static class FindFriendPartitioner extends Partitioner  implements Configurable
{
    @Override
    public int getPartition(Text key, Text NullWritable, int numPartitions) 
    {
        String[] keyItems = key.toString().split(",");
        String keyBase = keyItems[0];
        int part  = keyBase.hashCode() % numPartitions;
        return part;
    }

所以现在我们都准备好写实际的工作并解决我们的问题。

我假设您的输入文件如下所示:

1,2
2,1
1,3
3,2
2,4
4,1

我们将使用TextInputFormat.

以下是使用 Hadoop 1.0.4 的作业驱动程序的代码:

public class FindFriendTwo
{       
    public static class FindFriendMapper extends Mapper<Object, Text, Text, NullWritable> {
public void map(Object, Text value, Context context) throws IOException, InterruptedException 
{       
        context.write(value, new NullWritable() );
        String tempStrings[] = value.toString().split(","); 
        Text value2 = new Text(tempStrings[1] + "," + tempStrings[0]); //reverse relationship
        context.write(value2, new NullWritable());
}

}

请注意,我们还在 map 函数中传递了反向关系。

例如,如果输入字符串是 (1,4(,我们一定不要忘记 (4,1(。

public static class FindFriendReducer extends Reducer<Text, NullWritable, IntWritable, IntWritable> { 
    private Set<String> friendsSet;
    public void setup(Context context)
    {
        friendSet = new LinkedHashSet<String>();
    }
    public void reduce(Text syntheticKey, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
        String tempKeys[] = syntheticKey.toString().split(",");
        friendsSet.add(tempKeys[1]);
        if( friendsList.size() == 2 )
        {
            IntWritable key = Integer.parseInt(tempKeys[0]);
            IntWritable value = Integer.parseInt(tempKeys[1]);                
            write(key, value);
        }

   }
}

最后,我们必须记住在我们的主类中包含以下内容,以便框架使用我们的类。

jobConf.setGroupingComparatorClass(FFGroupComparator.class);
jobConf.setPartitionerClass(FindFriendPartitioner.class);

我会按如下方式处理这个问题。

  • 确保我们拥有所有的关系,并且每个关系恰好一次。
  • 只需数一数

关于我的笔记:

  • 我的键值对表示法是:K -> V
  • 键和值几乎总是一个数据结构(不仅仅是字符串或整数(
  • 从不使用密钥来处理数据。关键只是为了控制从映射器到右侧减速器的流量。在所有其他地方,我根本不看钥匙。框架确实在任何地方都需要密钥。使用"(("我的意思是说有一个键我完全忽略了。
  • 关于我的 aproach 的关键是它永远不需要同时内存中的"所有朋友"(因此它在非常大的情况下也有效(。

我们从很多开始

(x,y)

而且我们知道数据集中没有所有关系。

映射器:创建所有关系

Input:  ()    -> (x,y)
Output: (x,y) -> (x,y)
        (y,x) -> (y,x)

化简器:删除重复项(仅从迭代器输出第一个(

Input:  (x,y) -> [(x,y),(x,y),(x,y),(x,y),.... ]
Output: ()    -> (x,y)

映射器:"字数">

Input:  ()  -> (x,y)
Output: (x) -> (x,1)

减速器:计数它们

Input:  (x) -> [(x,1),(x,1),(x,1),(x,1),.... ]
Output: ()  -> (x,N)

在这么多优秀工程师的帮助下,我终于尝试了这个解决方案。

只有一个映射器和一个化简器。这里没有组合器。

映射器的输入:

1,2
2,1
1,3
3,1
3,2
3,4
5,1

映射器的输出:

1,2
2,1
1,2
2,1
1,3
3,1
1,3
3,1
4,3
3,4
1,5
5,1

减速机输出:

1   3
2   2
3   3
4   1
5   1

第一个是用户,第二个是朋友#。

在化简器阶段,我将hashSet添加到辅助分析中。感谢@Artem齐基里迪斯@Ashish你的回答给了我一个很好的线索。

编辑:

添加的代码:

//映射

public static class TokenizerMapper extends
        Mapper<Object, Text, Text, Text> {
    private Text word1 = new Text();
    private Text word2 = new Text();
    public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer itr = new StringTokenizer(line,",");
        if(itr.hasMoreElements()){
         word1.set(itr.nextToken().toLowerCase());
        }
        if(itr.hasMoreElements()){
            word2.set(itr.nextToken().toLowerCase());
        }
        context.write(word1, word2);
        context.write(word2, word1);

//
} }

//还原剂

public static class IntSumReducer extends
        Reducer<Text, Text, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable<Text> values,
            Context context) throws IOException, InterruptedException {
        HashSet<Text> set = new HashSet<Text>();
          int sum = 0;
          for (Text val : values) {
                if(!set.contains(val)){
                    set.add(val);
                    sum++;
                }
          }   
          result.set(sum);
          context.write(key, result);
    }
}

相关内容

  • 没有找到相关文章

最新更新