根据同一表中的前一个和下一个id分组



如何在Spark中创建以下分组?

df = spark.createDataFrame(
[(13, None,   17, 'data_a'),  # group 1
(17,   13,   18, 'data_b'),  # group 1
(18,   17, None, 'data_u'),  # group 1
(14, None,   15, 'data_c'),  # group 2
(15,   14,   16, 'data_d'),  # group 2
(16,   15,   19, 'data_e'),  # group 2
(19,   16,   22, 'data_f'),  # group 2
(22,   19,   24, 'data_g'),  # group 2
(24,   22, None, 'data_v'),  # group 2
(20, None, None, 'data_w'),  # group 3
(21, None,   23, 'data_h'),  # group 4
(23,   21, None, 'data_x'),  # group 4
],
['id', 'previous_id', 'next_id', 'data']
)

有了这些组,就有可能创建这个期望的结果:

+------+------+------+
|id_min|id_max|  data|
+------+------+------+
|    13|    18|data_u|
|    14|    24|data_v|
|    20|    20|data_w|
|    21|    23|data_x|
+------+------+------+

没有每个组都通用的ID。此外,组大小不同,因此固定数量的自连接并不是一个真正的选择。

但是,所有的id形成一种簇——id不能属于其他组。这个分组需要ML吗?怎么做呢?

我觉得有点工程。让尝试;

new =(df.select('*',monotonically_increasing_id().alias('index'),*[when(df[x].isNull() ,1*col('id')).otherwise(0).alias(f"{x}_g") for x in df.drop('id','data').columns] )#create columns that normalize values we are interested in and also create an index for rainy days ahead

.withColumn('grouper', sum('previous_id_g').over(Window.partitionBy().orderBy('index')))#Using index and normalized values create groups

#Now that we have got it under hold group
.groupby('grouper').agg(max('previous_id_g').alias('previous_id'),max('next_id_g').alias('next_id'),last('data').alias('data')).drop('grouper')

).show()
+-----------+-------+------+
|previous_id|next_id|  data|
+-----------+-------+------+
|         13|     18|data_u|
|         14|     24|data_v|
|         20|     20|data_w|
|         21|     23|data_x|
+-----------+-------+------+

最新更新