Actually I am working on pyspark code. My dataframe is
+-------+--------+--------+--------+--------+
|element|collect1|collect2|collect3|collect4|
+-------+--------+--------+--------+--------+
|A1 | 1.02 | 2.6 | 5.21 | 3.6 |
|A2 | 1.61 | 2.42 | 4.88 | 6.08 |
|B1 | 1.66 | 2.01 | 5.0 | 4.3 |
|C2 | 2.01 | 1.85 | 3.42 | 4.44 |
+-------+--------+--------+--------+--------+
我需要通过聚合所有 collectX 列来找到每个元素的平均值和标准dev。最终结果应如下所示。
+-------+--------+--------+
|element|mean |stddev |
+-------+--------+--------+
|A1 | 3.11 | 1.76 |
|A2 | 3.75 | 2.09 |
|B1 | 3.24 | 1.66 |
|C2 | 2.93 | 1.23 |
+-------+--------+--------+
下面的代码分解了各个列的所有平均值df.groupBy("element"(.mean((.show((.是否可以汇总所有列,而不是对每一列执行操作?
+-------+-------------+-------------+-------------+-------------+
|element|avg(collect1)|avg(collect2)|avg(collect3)|avg(collect4)|
+-------+-------------+-------------+-------------+-------------+
|A1 | 1.02 | 2.6 | 5.21 | 3.6 |
|A2 | 1.61 | 2.42 | 4.88 | 6.08 |
|B1 | 1.66 | 2.01 | 5.0 | 4.3 |
|C2 | 2.01 | 1.85 | 3.42 | 4.44 |
+-------+-------------+-------------+-------------+-------------+
我尝试使用描述函数,因为它具有完整的聚合函数,但仍显示为单独的列df.groupBy("element"(.mean((.describe((.show((
谢谢
Spark 允许您收集每列的各种统计信息。您正在尝试计算每行的统计信息。在这种情况下,您可以使用 udf
.下面是一个示例:D
$ pyspark
>>> from pyspark.sql.types import DoubleType
>>> from pyspark.sql.functions import array, udf
>>>
>>> mean = udf(lambda v: sum(v) / len(v), DoubleType())
>>> df = sc.parallelize([['A1', 1.02, 2.6, 5.21, 3.6], ['A2', 1.61, 2.42, 4.88, 6.08]]).toDF(['element', 'collect1', 'collect2', 'collect3', 'collect4'])
>>> df.show()
+-------+--------+--------+--------+--------+
|element|collect1|collect2|collect3|collect4|
+-------+--------+--------+--------+--------+
| A1| 1.02| 2.6| 5.21| 3.6|
| A2| 1.61| 2.42| 4.88| 6.08|
+-------+--------+--------+--------+--------+
>>> df.select('element', mean(array(df.columns[1:])).alias('mean')).show()
+-------+------+
|element| mean|
+-------+------+
| A1|3.1075|
| A2|3.7475|
+-------+------+
您是否尝试过将列加在一起并可能除以 4?
SELECT avg((collect1 + collect2 + collect3 + collect4) / 4),
stddev((collect1 + collect2 + collect3 + collect4) / 4)
这不会完全按照你想要的去做,但要明白这个想法。
不确定您的语言,但如果您对硬编码不满意,可以随时即时构建查询:
val collectColumns = df.columns.filter(_.startsWith("collect"))
val stmnt = "SELECT avg((" + collectColumns.mkString(" + ") + ") / " + collectColumns.length + "))"
你明白了。