为每组pyspark RDD/dataframe选择随机列



我的数据框架有10,000列,我必须对每个组应用一些逻辑(关键是区域和深度)。每个组最多使用10k列中的30列,这30列列表来自第二个数据集列"colList"。每个组将有2-3百万行。我的方法是按键分组,并像下面这样调用函数。但它失败了- 1。洗牌和2。2 .数据组大于2G(可通过重新分区解决,但成本高);非常缓慢的

def testfunc(iter):
   <<got some complex business logic which cant be done in spark API>>
resRDD = df.rdd.groupBy(region, dept).map(lambda x: testfunc(x))
输入:

region dept week val0 val1  val2  val3 ... val10000   
 US    CS   1     1    2    1     1   ...  2 
 US    CS   2     1.5  2    3     1   ...  2
 US    CS   3     1    2    2     2.1      2
 US    ELE  1     1.1  2    2     2.1      2
 US    ELE  2     2.1  2    2     2.1      2
 US    ELE  3     1    2    1     2   .... 2
 UE    CS   1     2    2    1     2   .... 2

每组选取的列数:(数据集2)

region dept colList   
 US    CS   val0,val10,val100,val2000 
 US    ELE  val2,val5,val800,val900
 UE    CS   val21,val54,val806,val9000

我的第二个解决方案是从输入数据创建一个只有30列的新数据集,并将这些列重命名为col1到col30。然后为每个列和组使用映射列表。然后我可以应用groupbyKey(假设),这将是Skinner比原始输入的10K列。

region dept week col0 col1  col2  col3 ... col30   
 US    CS   1     1    2    1     1   ...  2 
 US    CS   2     1.5  2    3     1   ...  2
 US    CS   3     1    2    2     2.1      2
 US    ELE  1     1.1  2    2     2.1      2
 US    ELE  2     2.1  2    2     2.1      2
 US    ELE  3     1    2    1     2   .... 2
 UE    CS   1     2    2    1     2   .... 2

可以有人帮助转换输入与10K到30列吗?或者任何其他替代方法都可以避免分组。

您可以使用create_map函数将所有10k列转换为每行映射。现在使用一个UDF,它接受映射、区域和深度,并将映射细化到30列,并确保所有30列始终具有相同的名称。最后,可以包装复杂的函数来接收映射,而不是原始的10K列。希望这将使它足够小,可以正常工作。

如果没有,你可以得到一个不同的区域和深度,如果有足够少,你可以循环通过一个,并按另一个分组。

最新更新