在map()或任何其他解决方案中使用sc.parallelize



我有以下问题:我需要从列A中找到每个id的列B中值的所有组合,并将结果返回为DataFrame

输入DataFrame

的示例如下
        A     B       
0       5    10       
1       1    20      
2       1    15       
3       3    50       
4       5    14       
5       1    30       
6       1    15       
7       3    33       

我需要得到以下输出DataFrame(它是为GraphXGraphFrame)

        src dist      A
0       10   14       5
1       50   33       3
2       20   15       1
3       30   15       1
4       20   30       1

到目前为止我想到的一个解决方案是:

df_result = df.drop_duplicates().
               map(lambda (A,B):(A,[B])).
               reduceByKey(lambda p, q: p + q).
               map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))
print df_result.take(3)

输出:((((20、15),20(30日),(30日15))),(((10,14))),(((50岁,33))))

在这里我卡住了:(如何返回到我需要的数据帧?其中一个想法是使用parallelize:

import spark_sc
edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))

对于spark_sc,我有另一个名为spark_sc.py的文件

def init():
    global sc
    global sqlContext
    sc = SparkContext(conf=conf,
                  appName="blablabla",
                  pyFiles=['my_file_with_code.py'])
    sqlContext = SQLContext(sc)

但是我的代码失败了:

AttributeError: 'module' object has no attribute 'sc'

如果我使用spark_sc.sc()不进入map()它工作。

你知道我在最后一步错过了什么吗?有可能使用parallelize()吗?还是需要完全不同的解决方案?谢谢!

您肯定需要另一个解决方案,可以像:

from pyspark.sql.functions import greatest, least, col
df.alias("x").join(df.alias("y"), ["A"]).select(
    least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A"
).where(col("src") != col("dst")).distinct()

地点:

df.alias("x").join(df.alias("y"), ["A"])

通过A连接表,

least("x.B", "y.B").alias("src")

greatest("x.B", "y.B")

选择id小的value作为源,id大的value作为目的。最后:

where(col("src") != col("dst"))

删除self循环。

在一般情况下,它是不可能使用SparkContext从一个动作或转换(并不是说它会有任何意义,这样做在您的情况下)。

最新更新