我在通过查看索引数组聚合双数组时遇到了与性能相关的问题。我的意思是。原始的Dataframe看起来像这样:
original Dataframe
| id | prop1 | values |
|----|--------------|-------------------------|
| 1 | [2,5,1,3] | [ 0.1, 0.5, 0.7, 0.8] |
| 2 | [2,1] | [ 0.2, 0.3 ] |
| 1 | [1,5] | [ 0.4, 0.3 ] |
| 2 | [3,2] | [ 0.0, 0.1 ] |
so in the column 2 which is prop1 is an int array having values within range of 1 to 5 but not in a order and there can be missing numbers within array.
Prop1 int array类似于双精度数组值的索引我的意思是第一行在展开
后的样子如下所示| id | prop1 | values |
|----|-------|--------|
| 1 | 2 | 0.1 |
| 1 | 5 | 0.5 |
| 1 | 1 | 0.7 |
| 1 | 3 | 0.8 |
最后一个问题,
所以我需要通过查找索引数组和列id
来聚合双数组的值所以结果应该是
| id | prop1 | values |
|----|----------------|--------------------------|
| 1 | [2,5,1,3] | [ 0.1, 0.8, 1.1, 0.8 ] |
| 2 | [2,1,3] | [ 0.3, 0.3, 0.0 ] |
Below code I am using to extract the values by index and pivot right before merging them to array
//dummy dataframe to get the sequence of 5 but the upper end is dynamic value and that can extend till 300k
var df = (1 to 5).toDF("prop1")
//joining original Df by prop1 column
var stgDf = originalDf.join(df,originalDf.col("prop1") === df.col("prop1"),"inner")
// pivoting the values by index
var pivotDf = stgDf.groupBy("id")
.pivot("prop1").agg(first("values"))
// now aggregating the pivoted values by id
var expr = pivtoDf.columns.map(sum(_))
var pivotDf.groupBy("id").agg(expr.head,expr.tail:_*)
//then grouping back into array by id
这个解决方案我使用爆炸prop1和价值,它与几行但实际问题都列的数组可以超过500 k值每一个也没有。每个id的行数可以超过3000万
如果有人可以看看和帮助这将是伟大的。使用spark 2.4在scala中构建应用程序
thanks in advance
X,而不是v2.4。升级,因为太难了。
一些严重的数据争吵!
可能有更好的方法,但它是可伸缩的。可能需要很多分区
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Column
val arrayStructureData = Seq(
Row(1,List(2,5,1,3),List(0.1, 0.5, 0.7, 0.8)),
Row(2,List(2,1),List(0.2, 0.3)),
Row(1,List(1,5),List(0.4, 0.3)),
Row(2,List(3,2),List(0.0, 0.1))
)
// Just a single StructType for the Row
val arrayStructureSchema = new StructType()
.add("id",IntegerType)
.add("prop1", ArrayType(IntegerType))
.add("values", ArrayType(DoubleType))
val df = spark.createDataFrame(spark.sparkContext.parallelize(arrayStructureData),arrayStructureSchema)
df.printSchema()
df.show()
val df2 = df.withColumn(
"jCols",
zip_with(
col("prop1"),
col("values"),
// Should be a struct really, but...array used. zip_with not available in v2.4!
(left: Column, right: Column) => array(left, right)
)
).drop('prop1).drop('values)
df2.show(false)
df2.printSchema()
val df3 = df2.groupBy("id").agg(collect_list("jCols").as("jCols"))
df3.printSchema()
df3.show(false)
val df4 = df3.select($"id",flatten($"jCols").as("jCols"))
df4.show(false)
df4.printSchema()
val df5 = df4.withColumn("ExjCols", explode($"jCols")).drop("jCols")
df5.show(false)
df5.printSchema()
val df6 = df5.select(col("id"),col("ExjCols")(0).as("prop1"),col("ExjCols")(1).as("values"))
df6.show(false)
df6.printSchema()
val df7 = df6.groupBy("id", "prop1").sum("values").toDF("id","prop1","values")
df7.show(false)
df7.printSchema()
val df8 = df7.withColumn("combined", array($"prop1", $"values"))
df8.show(false)
df8.printSchema()
val df9 = df8.groupBy("id").agg(collect_list("combined").as("propN"))
df9.show(false)
df9.printSchema()
val res = df9.withColumn("prop1",expr("transform(propN, x -> x[0])")).withColumn("values",expr("transform(propN, x -> x[1])")).drop('propN)
res.show(false)
的回报:
+---+--------------------+-------------------------------+
|id |prop1 |values |
+---+--------------------+-------------------------------+
|1 |[2.0, 5.0, 1.0, 3.0]|[0.1, 0.8, 1.1, 0.8] |
|2 |[2.0, 1.0, 3.0] |[0.30000000000000004, 0.3, 0.0]|
+---+--------------------+-------------------------------+
不知道为什么精度0.3000…发生了,但它确实发生了。也纠正了这个例子,它有一些错误。
我只能假设SO现在不那么受欢迎了,因为答案需要一段时间。