我有一个很大的PySpark DataFrame,我想像下面的例子一样操作它。我认为可视化它比描述它更容易。因此,为了便于说明,让我们取一个简单的DataFramedf
:
df.show()
+----------+-----------+-----------+
| series | timestamp | value |
+----------+-----------+-----------+
| ID1 | t1 | value1_1 |
| ID1 | t2 | value2_1 |
| ID1 | t3 | value3_1 |
| ID2 | t1 | value1_2 |
| ID2 | t2 | value2_2 |
| ID2 | t3 | value3_2 |
| ID3 | t1 | value1_3 |
| ID3 | t2 | value2_3 |
| ID3 | t3 | value3_3 |
+----------+-----------+-----------+
在上述DataFrame中,列series
中包含的三个唯一值(即ID1
、ID2
和ID3
(中的每一个都具有同时模拟出现的相应值(在列values
下((即列timestamp
中的相同条目(。
从这个DataFrame中,我想得到一个转换,它最终得到以下DataFrame,比如results
。可以看出,DataFrame的大小发生了变化,甚至列也根据原始DataFrame的条目进行了重命名。
result.show()
+-----------+-----------+-----------+-----------+
| timestamp | ID1 | ID2 | ID3 |
+-----------+-----------+-----------+-----------+
| t1 | value1_1 | value1_2 | value1_3 |
| t2 | value2_1 | value2_2 | value2_3 |
| t3 | value3_1 | value3_2 | value3_3 |
+-----------+-----------+-----------+-----------+
result
中列的顺序是任意的,不应影响最终答案。该说明性示例仅包含series
中的三个唯一值(即ID1
、ID2
和ID3
(。理想情况下,我想写一段代码,自动检测series
中的唯一值,从而生成一个新的对应列。有人知道我从哪里开始吗?我尝试过按timestamp
分组,然后使用聚合函数collect_set
收集一组不同的series
和value
,但没有成功:(
非常感谢!
马里安萨斯
只是一个简单的支点:
import pyspark.sql.functions as F
result = df.groupBy('timestamp').pivot('series').agg(F.first('value'))
确保df
中的每一行都是不同的;否则重复的条目可以被静默地消除重复。
根据mck的回答,我找到了一种提高pivot
性能的方法。pivot
是一个非常昂贵的操作,因此,对于ward上的Spark 2.0,建议提供列数据(如果已知(作为函数的参数,如下代码所示。这将提高DataFrames代码的性能,该代码比本问题中提出的说明性代码要大得多。假设series
的值事先已知,我们可以使用:
import pyspark.sql.functions as F
series_list = ('ID1', 'ID2', 'ID3')
result = df.groupBy('timestamp').pivot('series', series_list).agg(F.first('value'))
result.show()
+---------+--------+--------+--------+
|timestamp| ID1| ID2| ID3|
+---------+--------+--------+--------+
| t1|value1_1|value1_2|value1_3|
| t2|value2_1|value2_2|value2_3|
| t3|value3_1|value3_2|value3_3|
+---------+--------+--------+--------+