我有一个数据集,如下
col1 | extension_col1 |
---|---|
2345 | 2246 |
2246 | 2134 |
2134 | 2091 |
2091 | 空 |
1234 | 1111 |
1111 | 空 |
您可以使用一些Window函数来实现这一点。首先,使用extension_col1
上的累积条件和,创建一个组列grp
。然后,在一个由grp
划分并由col1
排序的窗口上使用row_number
函数,但这次是升序,您会得到所需的结果:
import org.apache.spark.sql.expressions.Window
val df = Seq(
(Some(99985), Some(94904)), (Some(94904), Some(89884)),
(Some(89884), Some(88592)), (Some(88592), Some(86367)),
(Some(86367), Some(84121)), (Some(84121), None)
).toDF("col1", "extension_col1")
val w1 = Window.orderBy(desc("col1"))
val w2 = Window.partitionBy("grp").orderBy("col1")
val result = df.withColumn(
"grp",
sum(when(col("extension_col1").isNull, 1).otherwise(0)).over(w1)
).withColumn(
"No_Of_Extensions",
when(col("extension_col1").isNull, 0).otherwise(row_number().over(w2))
).drop("grp").orderBy(desc("col1"))
result.show
//+-----+--------------+----------------+
//| col1|extension_col1|No_Of_Extensions|
//+-----+--------------+----------------+
//|99985| 94904| 5|
//|94904| 89884| 4|
//|89884| 88592| 3|
//|88592| 86367| 2|
//|86367| 84121| 1|
//|84121| null| 0|
//+-----+--------------+----------------+
请注意,第一个sum
使用的是非分区窗口,因此所有数据都将移动到一个分区中,因此可能会影响性能。
Spark SQL等效查询:
SELECT col1,
extension_col1,
case when extension_col1 is null then 0 else row_number() over(partition by grp order by col1) end as No_Of_Extensions
FROM (
SELECT *,
sum(case when extension_col1 is null then 1 else 0 end) over(order by col1 desc) as grp
FROM df
)
ORDER BY col1 desc
blackbishop的替代方案,因为我认为数据可能不会总是被排序,因此进行一些替代处理。我喜欢条件求和,但这里不适用。
老实说,Spark在规模上是一个糟糕的用例,因为我也无法绕过它分区方面或者作为其他答案状态。但是在更新的Spark上增加了分区大小在本例中,"列表"可能很长。
第1部分-生成数据
// 1. Generate data.
val df = Seq(( Some(2345), Some(22246) ), ( Some(22246), Some(2134) ), ( Some(2134), Some(2091) ), (Some(2091), None) ,
( Some(1234), Some(1111) ), ( Some(1111), None )
).toDF("col1" ,"extCol1")
第2部分-实际处理
//2. Narrow transform, add position in dataset as values nay not awlays be desc or asc.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rdd = df.rdd.zipWithIndex
val df2 = spark.createDataFrame(rdd.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema) // Some cost
//3. Make groupings in record ranges. Cannot avoid the single partition aspects, so this only works if we can do it with data that can fit into a single partition. At scale one would
// not be able to do this really unless some grouping characteristic.
val dfg = df2.filter(df2("extCol1").isNull)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val winSpec1 = Window.orderBy(asc("rowid"))
val dfg2 = dfg.withColumn("prev_rowid_tmp", lag("rowid", 1, -1).over(winSpec1))
.withColumn("rowidFrom", $"prev_rowid_tmp" + 1)
.drop("prev_rowid_tmp")
.drop("extCol1")
.withColumnRenamed("rowid","rowidTo")
//4. Apply grouping of ranges of rows to data.
val df3 = df2.as("df2").join(dfg2.as("dfg2"),
$"df2.rowid" >= $"dfg2.rowidFrom" && $"df2.rowid" <= $"dfg2.rowidTo", "inner")
//5. Do the calcs.
val res = df3.withColumn("numExtensions", $"rowidTo" - $"rowid")
res.select("df2.col1", "extCol1", "numExtensions").show(false)
退货:
+-----+-------+-------------+
|col1 |extCol1|numExtensions|
+-----+-------+-------------+
|2345 |22246 |3 |
|22246|2134 |2 |
|2134 |2091 |1 |
|2091 |null |0 |
|1234 |1111 |1 |
|1111 |null |0 |
+-----+-------+-------------+
对于数据表的第一列已经排序的场景,每次当上一条记录中第二列的值为null时,都会创建一个新的组,并根据具体要求在每个组中添加一个数字列。尝试在SQL中实现该过程是一件非常麻烦的事情。您需要首先根据需要创建行号和标记列,然后根据标记列和行号执行分组。一种常见的替代方法是从数据库中提取原始数据,并用Python或SPL进行处理。SPL,开源Java包,更容易集成到Java程序中,并生成更简单的代码。它只用两行代码来表达算法:
1 | =MYSQL.query("从t4中选择*"( |
2 | =A1.group@i(#2[-1]==null(.run(len=~.len((,~=~.derive(len-#:No_Of_Extensions((.concon(( |