在另一个RDD的基础上,修剪一个RDD



即使它们不共享相同的密钥,也有没有在一个rdd中过滤一个rdd的元素的方法?

我有两个RDD -ABC和XYZ

abc.collect()看起来像

[[a,b,c],[a,c,g,e],[a,e,b,x],[b,c]]

xyz.collect()看起来像

[a,b,c]

现在,我想滤除XYZ中不存在的RDD ABC的所有元素。

上述操作后,RDD ABC应该看起来像这样:

[[a,b,c],[a,c],[a,b],[b,c]]

我写了一个看起来像这样的代码:

def prune(eachlist):
    for i in eachlist:
        if i in xyz:
            return i
abc = abc.map(prune)

但是,这给了我这个错误:

Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation

从那以后,我尝试过过滤器,查找而不是地图无济于事。我不断遇到相同的错误。

我知道我可以在XYZ上进行集合操作并解决此错误,但是我正在大型数据集上运行此错误,并执行.collect()杀死我的AWS服务器,以超过过多的内存。因此,我需要在不使用.collect()或任何此类等效昂贵操作的情况下执行此操作。

您可以:

# Add index
abc.zipWithIndex() 
    # Flatten values
    .flatMap(lambda x: [(k, x[1]) for k in x[0]]) 
    # Join with xyz (works as filter)
    .join(xyz.map(lambda x: (x, None))) 
    # Group back by index
    .map(lambda x: (x[1][0], x[0])) 
    .groupByKey() 
    .map(lambda x: list(x[1]))

或者您可以在xyz上创建Bloom Filter并使用它映射abc

最新更新