连接两个RDD,然后按另一列分组



我有两个RDD,第一个RDD的格式为Code: string, Name: string,rdd2的格式是Code: string, Year: string, Delay: float

rdd1 = [('a', 'name1'), ('b', 'name2')]
rdd2 = [('a', '2000', 1.25), ('a', '2000', 2.0), ('b', '2010', -1.0)]

我想(在code上(执行联接,这样我就可以按name对数据进行分组,以便在delay上进行计数、平均值、最小值和最大值等聚合。

我试着在执行这样的联接后使值变平:

joined = rdd1.join(rdd2).map(lambda (keys, values): (keys,) + values)

但它出现了一个错误:缺少1个必需的位置参数。

我的联接结果也只显示[('code', ('name', 'year'))],不包括延迟值。我应该如何解决这个问题?

这在Python 3.x中不起作用,因为删除了对Tuple Parameter Unpacking(PEP-3113(的支持。因此产生了TypeError。

配对RDD作为密钥值加入工作,其中

(a,b(加入(a,c(会给你(a,(b,c(

因此,使其工作的一种方法是:

joined = rdd1.join(rdd2.map(lambda x: (x[0],x[1:])))
joined.map(lambda x: (x[0],)+ (x[1][0],) + x[1][1]).collect()
# Output
# [('b', 'name2', '2010', -1.0),
# ('a', 'name1', '2000', 1.25),
# ('a', 'name1', '2000', 2.0)]

在加入之前,您需要确保rdd2的形式为(key, value)。否则,第二个元素之后的元素将被丢弃。

rdd3 = rdd1.join(rdd2.map(lambda x: (x[0], (x[1], x[2]))))
rdd3.collect()
# [('b', ('name2', ('2010', -1.0))), ('a', ('name1', ('2000', 1.25))), ('a', ('name1', ('2000', 2.0)))]

如果要删除嵌套结构,可以再添加一个mapValues:

rdd3 = rdd1.join(rdd2.map(lambda x: (x[0], (x[1], x[2])))).mapValues(lambda x: (x[0], x[1][0], x[1][1]))
rdd3.collect()
# [('b', ('name2', '2010', -1.0)), ('a', ('name1', '2000', 1.25)), ('a', ('name1', '2000', 2.0))]

最新更新