spark-sql查找记录的扩展名数量



我有一个数据集,如下

col1 extension_col1
2345 2246
2246 2134
2134 2091
2091
1234 1111
1111

您可以使用一些Window函数来实现这一点。首先,使用extension_col1上的累积条件和,创建一个组列grp。然后,在一个由grp划分并由col1排序的窗口上使用row_number函数,但这次是升序,您会得到所需的结果:

import org.apache.spark.sql.expressions.Window
val df = Seq(
(Some(99985), Some(94904)), (Some(94904), Some(89884)),
(Some(89884), Some(88592)), (Some(88592), Some(86367)),
(Some(86367), Some(84121)), (Some(84121), None)
).toDF("col1", "extension_col1")
val w1 = Window.orderBy(desc("col1"))
val w2 = Window.partitionBy("grp").orderBy("col1")
val result = df.withColumn(
"grp",
sum(when(col("extension_col1").isNull, 1).otherwise(0)).over(w1)
).withColumn(
"No_Of_Extensions",
when(col("extension_col1").isNull, 0).otherwise(row_number().over(w2))
).drop("grp").orderBy(desc("col1"))
result.show

//+-----+--------------+----------------+
//| col1|extension_col1|No_Of_Extensions|
//+-----+--------------+----------------+
//|99985|         94904|               5|
//|94904|         89884|               4|
//|89884|         88592|               3|
//|88592|         86367|               2|
//|86367|         84121|               1|
//|84121|          null|               0|
//+-----+--------------+----------------+

请注意,第一个sum使用的是非分区窗口,因此所有数据都将移动到一个分区中,因此可能会影响性能。


Spark SQL等效查询:

SELECT col1, 
extension_col1, 
case when extension_col1 is null then 0 else row_number() over(partition by grp order by col1) end as No_Of_Extensions
FROM  (
SELECT *, 
sum(case when extension_col1 is null then 1 else 0 end) over(order by col1 desc) as grp
FROM df
)
ORDER BY col1 desc

blackbishop的替代方案,因为我认为数据可能不会总是被排序,因此进行一些替代处理。我喜欢条件求和,但这里不适用。

老实说,Spark在规模上是一个糟糕的用例,因为我也无法绕过它分区方面或者作为其他答案状态。但是在更新的Spark上增加了分区大小在本例中,"列表"可能很长。

第1部分-生成数据

// 1. Generate data.
val df = Seq(( Some(2345), Some(22246) ), ( Some(22246), Some(2134) ), ( Some(2134), Some(2091) ), (Some(2091), None) ,
( Some(1234), Some(1111) ), ( Some(1111), None )
).toDF("col1" ,"extCol1")

第2部分-实际处理

//2. Narrow transform, add position in dataset as values nay not awlays be desc or asc.
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val rdd = df.rdd.zipWithIndex
val df2 = spark.createDataFrame(rdd.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)  // Some cost

//3. Make groupings in record ranges. Cannot avoid the single partition aspects, so this only works if we can do it with data that can fit into a single partition. At scale one would 
//   not be able to do this really unless some grouping characteristic. 
val dfg = df2.filter(df2("extCol1").isNull)
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
val winSpec1 = Window.orderBy(asc("rowid"))
val dfg2 = dfg.withColumn("prev_rowid_tmp", lag("rowid", 1, -1).over(winSpec1))
.withColumn("rowidFrom", $"prev_rowid_tmp" + 1)
.drop("prev_rowid_tmp")
.drop("extCol1")
.withColumnRenamed("rowid","rowidTo")
//4. Apply grouping of ranges of rows to data.
val df3 = df2.as("df2").join(dfg2.as("dfg2"), 
$"df2.rowid" >= $"dfg2.rowidFrom" && $"df2.rowid" <= $"dfg2.rowidTo", "inner")             
//5. Do the calcs.
val res = df3.withColumn("numExtensions", $"rowidTo" - $"rowid") 
res.select("df2.col1", "extCol1", "numExtensions").show(false)

退货:

+-----+-------+-------------+
|col1 |extCol1|numExtensions|
+-----+-------+-------------+
|2345 |22246  |3            |
|22246|2134   |2            |
|2134 |2091   |1            |
|2091 |null   |0            |
|1234 |1111   |1            |
|1111 |null   |0            |
+-----+-------+-------------+

对于数据表的第一列已经排序的场景,每次当上一条记录中第二列的值为null时,都会创建一个新的组,并根据具体要求在每个组中添加一个数字列。尝试在SQL中实现该过程是一件非常麻烦的事情。您需要首先根据需要创建行号和标记列,然后根据标记列和行号执行分组。一种常见的替代方法是从数据库中提取原始数据,并用Python或SPL进行处理。SPL,开源Java包,更容易集成到Java程序中,并生成更简单的代码。它只用两行代码来表达算法:

1 =MYSQL.query("从t4中选择*"(
2 =A1.group@i(#2[-1]==null(.run(len=~.len((,~=~.derive(len-#:No_Of_Extensions((.concon((

相关内容

  • 没有找到相关文章

最新更新