如何在Spark中创建以下分组?
df = spark.createDataFrame(
[(13, None, 17, 'data_a'), # group 1
(17, 13, 18, 'data_b'), # group 1
(18, 17, None, 'data_u'), # group 1
(14, None, 15, 'data_c'), # group 2
(15, 14, 16, 'data_d'), # group 2
(16, 15, 19, 'data_e'), # group 2
(19, 16, 22, 'data_f'), # group 2
(22, 19, 24, 'data_g'), # group 2
(24, 22, None, 'data_v'), # group 2
(20, None, None, 'data_w'), # group 3
(21, None, 23, 'data_h'), # group 4
(23, 21, None, 'data_x'), # group 4
],
['id', 'previous_id', 'next_id', 'data']
)
有了这些组,就有可能创建这个期望的结果:
+------+------+------+
|id_min|id_max| data|
+------+------+------+
| 13| 18|data_u|
| 14| 24|data_v|
| 20| 20|data_w|
| 21| 23|data_x|
+------+------+------+
没有每个组都通用的ID。此外,组大小不同,因此固定数量的自连接并不是一个真正的选择。
但是,所有的id形成一种簇——id不能属于其他组。这个分组需要ML吗?怎么做呢?
我觉得有点工程。让尝试;
new =(df.select('*',monotonically_increasing_id().alias('index'),*[when(df[x].isNull() ,1*col('id')).otherwise(0).alias(f"{x}_g") for x in df.drop('id','data').columns] )#create columns that normalize values we are interested in and also create an index for rainy days ahead
.withColumn('grouper', sum('previous_id_g').over(Window.partitionBy().orderBy('index')))#Using index and normalized values create groups
#Now that we have got it under hold group
.groupby('grouper').agg(max('previous_id_g').alias('previous_id'),max('next_id_g').alias('next_id'),last('data').alias('data')).drop('grouper')
).show()
+-----------+-------+------+
|previous_id|next_id| data|
+-----------+-------+------+
| 13| 18|data_u|
| 14| 24|data_v|
| 20| 20|data_w|
| 21| 23|data_x|
+-----------+-------+------+