PySpark中用列表列减去两个RDD失败



我有两个以下类型的RDD:

RDD[(int, List[(string, int)]

我想从两个RDD中得到相减集。代码如下:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('xxxxx.com').getOrCreate()
rdd1 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd2 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd = rdd1.subtract(rdd2)
rdd.toDF().show()

然而,我得到了以下错误:

d[k] = comb(d[k], v) if k in d else creator(v)
TypeError: unhashable type: 'list'

但是如果我先把rdd改成DF,然后再做减法,它就能得到正确的答案。如果直接使用rdd,不知道如何解决问题。

rdd1 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd2 = spark.sparkContext.parallelize([(1, [("foo", 101), ("bar", 111)]), (2, [("foobar", 22), ("bar", 222)]), (3, [("foo", 333)])])
rdd = rdd1.toDF().subtract(rdd2.toDF())
rdd.show()

首先,这在python中不起作用的原因很简单。subtract是关于寻找rdd1中不在rdd2中的元素。为此,spark将把具有相同哈希值的所有记录放在同一分区上,然后检查rdd1的每条记录,是否有来自rdd2的具有相同哈希值的相等记录。要做到这一点,记录必须是可散列的。在python中,元组是,但列表不是,因此会得到错误。有几种变通方法。最简单的方法可能是使用scala。列表在java/scala中是可哈希的。

val rdd1 = spark.sparkContext.parallelize(Seq((1, Seq(("foo", 101), ("bar", 111))), (2, Seq(("foobar", 22), ("bar", 222))), (3, Seq(("foo", 333)))))
val rdd2 = spark.sparkContext.parallelize(Seq((1, Seq(("foo", 101), ("bar", 111))), (2, Seq(("foobar", 22), ("bar", 222))), (3, Seq(("foo", 333)))))
// and you can check that this works
rdd1.subtract(rdd2).collect()
在python中,一种方法是定义自己的list类。它需要是可哈希的提供一个__eq__方法,让spark知道对象何时相等。这样的自定义类可以定义如下:
class my_list:
def __init__(self, list):
self.list=list

def __hash__(self):
my_hash = 0
for t in self.list:
my_hash+=hash(t[0])
my_hash+=t[1]
return my_hash

def __eq__(self, other_list):
self.list == other_list.list

然后,您可以检查这是否有效:

rdd1.mapValues(lambda x : my_list(x))
.subtract(rdd2.mapValues(lambda x: my_list(x)))
.collect()
注意:如果你在shell中工作,不要在shell中定义类,否则pickle将无法序列化你的类。在单独的文件(如my_list.py)中定义它,然后使用pyspark --py-files my_list.py和shell中的import my_list from my_list导入它。

相关内容

  • 没有找到相关文章

最新更新