我有以下问题:我需要从列A中找到每个id的列B中值的所有组合,并将结果返回为DataFrame
输入DataFrame
的示例如下 A B
0 5 10
1 1 20
2 1 15
3 3 50
4 5 14
5 1 30
6 1 15
7 3 33
我需要得到以下输出DataFrame(它是为GraphXGraphFrame)
src dist A
0 10 14 5
1 50 33 3
2 20 15 1
3 30 15 1
4 20 30 1
到目前为止我想到的一个解决方案是:
df_result = df.drop_duplicates().
map(lambda (A,B):(A,[B])).
reduceByKey(lambda p, q: p + q).
map(lambda (A,B_values_array):(A,[k for k in itertools.combinations(B_values_array,2)]))
print df_result.take(3)
输出:((((20、15),20(30日),(30日15))),(((10,14))),(((50岁,33))))
在这里我卡住了:(如何返回到我需要的数据帧?其中一个想法是使用parallelize:
import spark_sc
edges = df_result.map(lambda (A,B_pairs): spark_sc.sc.parallelize([(k[0],k[1],A) for k in B_pairs]))
对于spark_sc
,我有另一个名为spark_sc.py的文件
def init():
global sc
global sqlContext
sc = SparkContext(conf=conf,
appName="blablabla",
pyFiles=['my_file_with_code.py'])
sqlContext = SQLContext(sc)
但是我的代码失败了:
AttributeError: 'module' object has no attribute 'sc'
如果我使用spark_sc.sc()
不进入map()
它工作。
你知道我在最后一步错过了什么吗?有可能使用parallelize()
吗?还是需要完全不同的解决方案?谢谢!
您肯定需要另一个解决方案,可以像:
from pyspark.sql.functions import greatest, least, col
df.alias("x").join(df.alias("y"), ["A"]).select(
least("x.B", "y.B").alias("src"), greatest("x.B", "y.B").alias("dst"), "A"
).where(col("src") != col("dst")).distinct()
地点:
df.alias("x").join(df.alias("y"), ["A"])
通过A
连接表,
least("x.B", "y.B").alias("src")
和
greatest("x.B", "y.B")
选择id
小的value作为源,id大的value作为目的。最后:
where(col("src") != col("dst"))
删除self循环。
在一般情况下,它是不可能使用SparkContext
从一个动作或转换(并不是说它会有任何意义,这样做在您的情况下)。