通过 MapReduce in spark (python) 理解组



我正在尝试一个小程序,我正在考虑一个员工数据集,并试图计算各个部门分配的工资总和。我有一个可复制的例子。

 emp_list=[(u'ACC', [u'101', u'a', u'ACC', u'1000']),
 (u'SALES', [u'102', u'b', u'SALES', u'2000']),
 (u'IT', [u'103', u'c', u'IT', u'3000']),
 (u'ACC', [u'104', u'd', u'ACC', u'4000']),
 (u'ACC', [u'105', u'e', u'ACC', u'5000']),
 (u'HR', [u'106', u'f', u'HR', u'6000']),
 (u'ACC', [u'107', u'g', u'ACC', u'7000']),
 (u'FIN', [u'108', u'h', u'FIN', u'8000']),
 (u'ACC', [u'109', u'k', u'ACC', u'9000']),
 (u'HR', [u'1010', u'l', u'HR', u'10000']),
 (u'ACC', [u'1011', u'm', u'ACC', u'11000']),
 (u'ACC', [u'1012', u'n', u'ACC', u'12000']),
 (u'FIN', [u'1013', u'o', u'FIN', u'13000']),
 (u'IT', [u'1014', u'p', u'IT', u'14000'])]
emp=sc.parallelize(emp_list)
emp.reduceByKey(lambda x,y : x[3]+y[3]).take(10)

我得到的输出是:

[(u'ACC', u'00'),
 (u'HR', u'600010000'),
 (u'FIN', u'800013000'),
 (u'SALES', [u'102', u'b', u'SALES', u'2000']),
 (u'IT', u'300014000')]

谁能解释一下为什么我对ACCSALES部门有奇怪的价值观。我也想看看这两个人的工资。

你会得到奇怪的值,因为你的函数逻辑是无效的。如果你使用Scala而不是Python,这甚至不会编译。应用 LHS 和 RHS reduceByKey并且返回类型应为同一类型时:

reduceByKey(func: (V, V) ⇒ V): RDD[(K, V)]

func应该是联想的。

在您的情况下,类型不匹配(输入是列表,返回类型是字符串),并且函数不是关联的。为了理解发生了什么,让我们考虑两种不同的情况:

  1. 每个键只有一个值。由于未应用func因此此值作为输出。因此(u'SALES', [u'102', u'b', u'SALES', u'2000'])

  2. 每个键多个值。让我们以 ACC 中的值子集为例,并假设操作顺序定义如下

    (
      # 1st partition
      ([u'101', u'a', u'ACC', u'1000'], [u'104', u'd', u'ACC', u'4000']),
      # 2nd partition
      ([u'105', u'e', u'ACC', u'5000'], [u'107', u'g', u'ACC', u'7000'])
    )
    

    第一次应用func后,我们得到:

    (
       u'10004000',
       ([u'105', u'e', u'ACC', u'5000'], [u'107', u'g', u'ACC', u'7000'])
    )
    

    第二次应用func后,我们得到

    (
       u'10004000',
       u'50007000'
    )
    

    最后

    u'00'
    

    实际上,括号大小可能因配置而异,因此您可以获得不同的输出。

要获得正确的结果,您应该按照@alexs或map的建议使用aggregateByKey/combineByKeymap + reduce,后跟groupByKeymapValues。最后一个应该是最有效的方法,因为它不需要中间对象:

emp.mapValues(lambda x: x[3]).groupByKey().mapValues(lambda xs: "".join(xs))

为了参考同样的事情,使用aggregateByKey

from operator import add
rdd.aggregateByKey("", lambda acc, x: acc + x[3], add)

也许这应该有效:

emp.map(lambda k, v: (k, v[3])).reduceByKey(lambda x,y : x+y).take(10)

虽然我没有 Spark 实例可以尝试

您需要记住,当您进行缩减时,还原的元素需要在还原第一次看到它们时以及所有其他时间保持相同的结构。因此,这是在您的示例中执行此操作的方法:

emp_list=[(u'ACC', [u'101', u'a', u'ACC', u'1000']),
 (u'SALES', [u'102', u'b', u'SALES', u'2000']),
 (u'IT', [u'103', u'c', u'IT', u'3000']),
 (u'ACC', [u'104', u'd', u'ACC', u'4000']),
 (u'ACC', [u'105', u'e', u'ACC', u'5000']),
 (u'HR', [u'106', u'f', u'HR', u'6000']),
 (u'ACC', [u'107', u'g', u'ACC', u'7000']),
 (u'FIN', [u'108', u'h', u'FIN', u'8000']),
 (u'ACC', [u'109', u'k', u'ACC', u'9000']),
 (u'HR', [u'1010', u'l', u'HR', u'10000']),
 (u'ACC', [u'1011', u'm', u'ACC', u'11000']),
 (u'ACC', [u'1012', u'n', u'ACC', u'12000']),
 (u'FIN', [u'1013', u'o', u'FIN', u'13000']),
 (u'IT', [u'1014', u'p', u'IT', u'14000'])]
emp=sc.parallelize(emp_list)
print emp.reduceByKey(lambda x,y : (1,1,1,x[3] + y[3]))
.map(lambda x: (x[0], x[1][3])).collect()
>> [(u'ACC', u'100040005000700090001100012000'), (u'HR', u'600010000'),
 (u'FIN', u'800013000'), (u'SALES', u'2000'), (u'IT', u'300014000')]

相关内容

  • 没有找到相关文章

最新更新