如何用PySpark正确地完成两个rdd的完整外部连接



我正在寻找一种通过键组合两个rdd的方法。

给定

:

x = sc.parallelize([('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'FR', '75001'),
                ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', 'TN', '8160'),
               ]
              )
y = sc.parallelize([('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', 'JmJCFu3N'),
                ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', 'KNPQLQth'),
                ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', 'KlGZj08d'),
               ]
              )

我找到解决办法了!然而,对于我想要做的事情,这个解决方案并不完全令人满意。我创建了一个函数来指定我的键,它将应用于我的名为"x"的rdd:

def get_keys(rdd):
    new_x = rdd.map(lambda item: (item[0], (item[1], item[2])))
    return new_x
new_x = get_keys(x)

给出:

[('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160'))]

:

new_x.union(y).map(lambda (x, y): (x, [y])).reduceByKey(lambda p, q : p + q).collect()

结果:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', ['JmJCFu3N']),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', [('FR', '75001'), 'KlGZj08d']),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', [('TN', '8160')]),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', ['KNPQLQth'])]

我想要的是:

[('_guid_oX6Lu2xxHtA_T93sK6igyW5RaHH1tAsWcF0RpNx_kUQ=', (None, None, 'JmJCFu3N')),
 ('_guid_YWKnKkcrg_Ej0icb07bhd-mXPjw-FcPi764RRhVrOxE=', ('FR', '75001', 'KlGZj08d')),
 ('_guid_XblBPCaB8qx9SK3D4HuAZwO-1cuBPc1GgfgNUC2PYm4=', ('TN', '8160', None)),
 ('_guid_hG88Yt5EUsqT8a06Cy380ga3XHPwaFylNyuvvqDslCw=', (None, None, 'KNPQLQth'))]  

帮助吗?

为什么不呢?

>>> new_x.fullOuterJoin(y)

>>> x.toDF().join(y.toDF(), ["_1"], "fullouter").rdd

最新更新