例如,在熊猫中这样做的一种愚蠢的方式就像
d = df.set_index(['date', 'a']).unstack('a').stack('a', drop_na=False)
d.shape, df.shape
(10, 4), (30, 4)
如果您在10个唯一值的" a"中有3个唯一的日期(扩展到10 x 3(。
更新:使用Maff提供的示例,这是您在Pandas中可能会做的。
In [260]: from datetime import datetime
In [261]: df = spark.createDataFrame(
...: [[datetime(y,1,1), a, b] for y, a, b
...: in zip(range(2000, 2010), range(10), range(10, 20))],
...: ['date', 'a', 'b'])
...:
...:
In [262]: df.show()
+-------------------+---+---+
| date| a| b|
+-------------------+---+---+
|2000-01-01 00:00:00| 0| 10|
|2001-01-01 00:00:00| 1| 11|
|2002-01-01 00:00:00| 2| 12|
|2003-01-01 00:00:00| 3| 13|
|2004-01-01 00:00:00| 4| 14|
|2005-01-01 00:00:00| 5| 15|
|2006-01-01 00:00:00| 6| 16|
|2007-01-01 00:00:00| 7| 17|
|2008-01-01 00:00:00| 8| 18|
|2009-01-01 00:00:00| 9| 19|
+-------------------+---+---+
In [263]: d = df.toPandas().set_index(['date', 'a'])
In [264]: d
Out[264]:
b
date a
2000-01-01 0 10
2001-01-01 1 11
2002-01-01 2 12
2003-01-01 3 13
2004-01-01 4 14
2005-01-01 5 15
2006-01-01 6 16
2007-01-01 7 17
2008-01-01 8 18
2009-01-01 9 19
In [265]: d = d.reindex(pd.MultiIndex.from_product(d.index.levels))
In [266]: d.shape
Out[266]: (100, 1)
In [267]: d.head()
Out[267]:
b
2000-01-01 0 10.0
1 NaN
2 NaN
3 NaN
4 NaN
-
unstack
或pandas
中的pivot
等于pyspark中的pivot
- pandas中的
stack
或melt
等于Pyspark中的explode
让我们从示例数据帧开始:
from datetime import datetime
df = spark.createDataFrame(
[[datetime(y,1,1), a, b] for y, a, b
in zip(range(2000, 2010), range(10), range(10, 20))],
['date', 'a', 'b'])
df.show()
+-------------------+---+---+
| date| a| b|
+-------------------+---+---+
|2000-01-01 00:00:00| 0| 10|
|2001-01-01 00:00:00| 1| 11|
|2002-01-01 00:00:00| 2| 12|
|2003-01-01 00:00:00| 3| 13|
|2004-01-01 00:00:00| 4| 14|
|2005-01-01 00:00:00| 5| 15|
|2006-01-01 00:00:00| 6| 16|
|2007-01-01 00:00:00| 7| 17|
|2008-01-01 00:00:00| 8| 18|
|2009-01-01 00:00:00| 9| 19|
+-------------------+---+---+
1。unstack
解开数据框:
import pyspark.sql.functions as psf
df_unstack = df.groupBy("date").pivot("a").agg(psf.max("b"))
df_unstack.show()
+-------------------+----+----+----+----+----+----+----+----+----+----+
| date| 0| 1| 2| 3| 4| 5| 6| 7| 8| 9|
+-------------------+----+----+----+----+----+----+----+----+----+----+
|2003-01-01 00:00:00|null|null|null| 13|null|null|null|null|null|null|
|2004-01-01 00:00:00|null|null|null|null| 14|null|null|null|null|null|
|2009-01-01 00:00:00|null|null|null|null|null|null|null|null|null| 19|
|2001-01-01 00:00:00|null| 11|null|null|null|null|null|null|null|null|
|2006-01-01 00:00:00|null|null|null|null|null|null| 16|null|null|null|
|2008-01-01 00:00:00|null|null|null|null|null|null|null|null| 18|null|
|2005-01-01 00:00:00|null|null|null|null|null| 15|null|null|null|null|
|2000-01-01 00:00:00| 10|null|null|null|null|null|null|null|null|null|
|2007-01-01 00:00:00|null|null|null|null|null|null|null| 17|null|null|
|2002-01-01 00:00:00|null|null| 12|null|null|null|null|null|null|null|
+-------------------+----+----+----+----+----+----+----+----+----+----+
2。stack
from itertools import chain
df_stack = df_unstack.select(
'date',
psf.explode(psf.create_map(
list(chain(*[(psf.lit(c), psf.col(c)) for c in df_unstack.columns if c != "date"]))
)).alias("a", "b"))
.filter(~psf.isnull("b"))
df_stack.show()
+-------------------+---+---+
| date| a| b|
+-------------------+---+---+
|2003-01-01 00:00:00| 3| 13|
|2004-01-01 00:00:00| 4| 14|
|2009-01-01 00:00:00| 9| 19|
|2001-01-01 00:00:00| 1| 11|
|2006-01-01 00:00:00| 6| 16|
|2008-01-01 00:00:00| 8| 18|
|2005-01-01 00:00:00| 5| 15|
|2000-01-01 00:00:00| 0| 10|
|2007-01-01 00:00:00| 7| 17|
|2002-01-01 00:00:00| 2| 12|
+-------------------+---+---+
如果最后一个片段似乎有点费力,这是因为explode
实际上是针对ArrayType
或MapType
列的,而不是用于单独的列列表。与 .groupBy().agg(psf.collect_set)
相比,它比pivot
。
3。笛卡尔产品,自加入或交叉加入
您要寻找的不是堆叠,解开(展开,爆炸...(数据框架,而是自身制作笛卡尔产品的方法。在pyspark
中,您可以使用crossJoin
(如果 spark> = 2 否则join
而无需任何加入密钥(。
left = df.alias('left')
right = df.alias('right')
df_cross = df_left.crossJoin(df_right)
.select(
psf.col('left.date'),
psf.col('right.a'),
psf.when(psf.col('left.date') == psf.col('right.date'), left.b).otherwise(None).alias('b'))
df_cross.sort('date', 'a').show()
+-------------------+---+----+
| date| a| b|
+-------------------+---+----+
|2000-01-01 00:00:00| 0| 10|
|2000-01-01 00:00:00| 1|null|
|2000-01-01 00:00:00| 2|null|
|2000-01-01 00:00:00| 3|null|
|2000-01-01 00:00:00| 4|null|
|2000-01-01 00:00:00| 5|null|
|2000-01-01 00:00:00| 6|null|
|2000-01-01 00:00:00| 7|null|
|2000-01-01 00:00:00| 8|null|
|2000-01-01 00:00:00| 9|null|
|2001-01-01 00:00:00| 0|null|
|2001-01-01 00:00:00| 1| 11|
|2001-01-01 00:00:00| 2|null|
|2001-01-01 00:00:00| 3|null|
|2001-01-01 00:00:00| 4|null|
|2001-01-01 00:00:00| 5|null|
|2001-01-01 00:00:00| 6|null|
|2001-01-01 00:00:00| 7|null|
|2001-01-01 00:00:00| 8|null|
|2001-01-01 00:00:00| 9|null|
+-------------------+---+----+
这是 pandas
等于数据框的合并本身:
import numpy as np
df_pd = df.toPandas()
df_pd.loc[:, 'key'] = 1
df_pd_cross = df_pd.merge(df_pd, on='key')
df_pd_cross = df_pd_cross
.assign(b=np.where(df_pd_cross.date_x==df_pd_cross.date_y, df_pd_cross.b_x, None))
.rename(columns={'date_x': 'date', 'a_y': 'a'})[['date', 'a', 'b']]
df_pd_cross.sort_values(['date', 'a']).head(20)
+-----+-------------+----+------+
| | date | a | b |
+-----+-------------+----+------+
| 0 | 2000-01-01 | 0 | 10 |
| 1 | 2000-01-01 | 1 | None |
| 2 | 2000-01-01 | 2 | None |
| 3 | 2000-01-01 | 3 | None |
| 4 | 2000-01-01 | 4 | None |
| 5 | 2000-01-01 | 5 | None |
| 6 | 2000-01-01 | 6 | None |
| 7 | 2000-01-01 | 7 | None |
| 8 | 2000-01-01 | 8 | None |
| 9 | 2000-01-01 | 9 | None |
| 10 | 2001-01-01 | 0 | None |
| 11 | 2001-01-01 | 1 | 11 |
| 12 | 2001-01-01 | 2 | None |
| 13 | 2001-01-01 | 3 | None |
| 14 | 2001-01-01 | 4 | None |
| 15 | 2001-01-01 | 5 | None |
| 16 | 2001-01-01 | 6 | None |
| 17 | 2001-01-01 | 7 | None |
| 18 | 2001-01-01 | 8 | None |
| 19 | 2001-01-01 | 9 | None |
+-----+-------------+----+------+