如何在火花数据框架中像Pandas中的柱子的coard量扩展



例如,在熊猫中这样做的一种愚蠢的方式就像

d = df.set_index(['date', 'a']).unstack('a').stack('a', drop_na=False)
d.shape, df.shape
(10, 4), (30, 4)

如果您在10个唯一值的" a"中有3个唯一的日期(扩展到10 x 3(。

更新:使用Maff提供的示例,这是您在Pandas中可能会做的。

In [260]: from datetime import datetime
In [261]: df = spark.createDataFrame(
     ...:     [[datetime(y,1,1), a, b] for y, a, b
     ...:      in zip(range(2000, 2010), range(10), range(10, 20))],
     ...:     ['date', 'a', 'b'])
     ...:
     ...:
In [262]: df.show()
+-------------------+---+---+
|               date|  a|  b|
+-------------------+---+---+
|2000-01-01 00:00:00|  0| 10|
|2001-01-01 00:00:00|  1| 11|
|2002-01-01 00:00:00|  2| 12|
|2003-01-01 00:00:00|  3| 13|
|2004-01-01 00:00:00|  4| 14|
|2005-01-01 00:00:00|  5| 15|
|2006-01-01 00:00:00|  6| 16|
|2007-01-01 00:00:00|  7| 17|
|2008-01-01 00:00:00|  8| 18|
|2009-01-01 00:00:00|  9| 19|
+-------------------+---+---+

In [263]: d = df.toPandas().set_index(['date', 'a'])
In [264]: d
Out[264]:
               b
date       a
2000-01-01 0  10
2001-01-01 1  11
2002-01-01 2  12
2003-01-01 3  13
2004-01-01 4  14
2005-01-01 5  15
2006-01-01 6  16
2007-01-01 7  17
2008-01-01 8  18
2009-01-01 9  19
In [265]: d = d.reindex(pd.MultiIndex.from_product(d.index.levels))
In [266]: d.shape
Out[266]: (100, 1)
In [267]: d.head()
Out[267]:
                 b
2000-01-01 0  10.0
           1   NaN
           2   NaN
           3   NaN
           4   NaN
  • unstackpandas中的 pivot等于pyspark中的 pivot
  • pandas中的stackmelt等于Pyspark中的explode

让我们从示例数据帧开始:

from datetime import datetime
df = spark.createDataFrame(
    [[datetime(y,1,1), a, b] for y, a, b
     in zip(range(2000, 2010), range(10), range(10, 20))], 
    ['date', 'a', 'b'])
df.show()
    +-------------------+---+---+
    |               date|  a|  b|
    +-------------------+---+---+
    |2000-01-01 00:00:00|  0| 10|
    |2001-01-01 00:00:00|  1| 11|
    |2002-01-01 00:00:00|  2| 12|
    |2003-01-01 00:00:00|  3| 13|
    |2004-01-01 00:00:00|  4| 14|
    |2005-01-01 00:00:00|  5| 15|
    |2006-01-01 00:00:00|  6| 16|
    |2007-01-01 00:00:00|  7| 17|
    |2008-01-01 00:00:00|  8| 18|
    |2009-01-01 00:00:00|  9| 19|
    +-------------------+---+---+

1。unstack

解开数据框:

import pyspark.sql.functions as psf
df_unstack = df.groupBy("date").pivot("a").agg(psf.max("b"))
df_unstack.show()
    +-------------------+----+----+----+----+----+----+----+----+----+----+
    |               date|   0|   1|   2|   3|   4|   5|   6|   7|   8|   9|
    +-------------------+----+----+----+----+----+----+----+----+----+----+
    |2003-01-01 00:00:00|null|null|null|  13|null|null|null|null|null|null|
    |2004-01-01 00:00:00|null|null|null|null|  14|null|null|null|null|null|
    |2009-01-01 00:00:00|null|null|null|null|null|null|null|null|null|  19|
    |2001-01-01 00:00:00|null|  11|null|null|null|null|null|null|null|null|
    |2006-01-01 00:00:00|null|null|null|null|null|null|  16|null|null|null|
    |2008-01-01 00:00:00|null|null|null|null|null|null|null|null|  18|null|
    |2005-01-01 00:00:00|null|null|null|null|null|  15|null|null|null|null|
    |2000-01-01 00:00:00|  10|null|null|null|null|null|null|null|null|null|
    |2007-01-01 00:00:00|null|null|null|null|null|null|null|  17|null|null|
    |2002-01-01 00:00:00|null|null|  12|null|null|null|null|null|null|null|
    +-------------------+----+----+----+----+----+----+----+----+----+----+

2。stack

from itertools import chain
df_stack = df_unstack.select(
        'date', 
        psf.explode(psf.create_map(
            list(chain(*[(psf.lit(c), psf.col(c)) for c in df_unstack.columns if c != "date"]))
        )).alias("a", "b"))
    .filter(~psf.isnull("b"))
df_stack.show()
    +-------------------+---+---+
    |               date|  a|  b|
    +-------------------+---+---+
    |2003-01-01 00:00:00|  3| 13|
    |2004-01-01 00:00:00|  4| 14|
    |2009-01-01 00:00:00|  9| 19|
    |2001-01-01 00:00:00|  1| 11|
    |2006-01-01 00:00:00|  6| 16|
    |2008-01-01 00:00:00|  8| 18|
    |2005-01-01 00:00:00|  5| 15|
    |2000-01-01 00:00:00|  0| 10|
    |2007-01-01 00:00:00|  7| 17|
    |2002-01-01 00:00:00|  2| 12|
    +-------------------+---+---+

如果最后一个片段似乎有点费力,这是因为explode实际上是针对ArrayTypeMapType列的,而不是用于单独的列列表。与 .groupBy().agg(psf.collect_set)相比,它比pivot

3。笛卡尔产品,自加入或交叉加入

您要寻找的不是堆叠,解开(展开,爆炸...(数据框架,而是自身制作笛卡尔产品的方法。在pyspark中,您可以使用crossJoin(如果 spark> = 2 否则join而无需任何加入密钥(。

left = df.alias('left')
right = df.alias('right')
df_cross = df_left.crossJoin(df_right) 
    .select(
        psf.col('left.date'), 
        psf.col('right.a'), 
        psf.when(psf.col('left.date') == psf.col('right.date'), left.b).otherwise(None).alias('b'))
df_cross.sort('date', 'a').show()
    +-------------------+---+----+
    |               date|  a|   b|
    +-------------------+---+----+
    |2000-01-01 00:00:00|  0|  10|
    |2000-01-01 00:00:00|  1|null|
    |2000-01-01 00:00:00|  2|null|
    |2000-01-01 00:00:00|  3|null|
    |2000-01-01 00:00:00|  4|null|
    |2000-01-01 00:00:00|  5|null|
    |2000-01-01 00:00:00|  6|null|
    |2000-01-01 00:00:00|  7|null|
    |2000-01-01 00:00:00|  8|null|
    |2000-01-01 00:00:00|  9|null|
    |2001-01-01 00:00:00|  0|null|
    |2001-01-01 00:00:00|  1|  11|
    |2001-01-01 00:00:00|  2|null|
    |2001-01-01 00:00:00|  3|null|
    |2001-01-01 00:00:00|  4|null|
    |2001-01-01 00:00:00|  5|null|
    |2001-01-01 00:00:00|  6|null|
    |2001-01-01 00:00:00|  7|null|
    |2001-01-01 00:00:00|  8|null|
    |2001-01-01 00:00:00|  9|null|
    +-------------------+---+----+

这是 pandas等于数据框的合并本身:

import numpy as np
df_pd = df.toPandas()
df_pd.loc[:, 'key'] = 1
df_pd_cross = df_pd.merge(df_pd, on='key')
df_pd_cross = df_pd_cross 
    .assign(b=np.where(df_pd_cross.date_x==df_pd_cross.date_y, df_pd_cross.b_x, None)) 
    .rename(columns={'date_x': 'date', 'a_y': 'a'})[['date', 'a', 'b']]
df_pd_cross.sort_values(['date', 'a']).head(20)
    +-----+-------------+----+------+
    |     |    date     | a  |  b   |
    +-----+-------------+----+------+
    |  0  | 2000-01-01  | 0  | 10   |
    |  1  | 2000-01-01  | 1  | None |
    |  2  | 2000-01-01  | 2  | None |
    |  3  | 2000-01-01  | 3  | None |
    |  4  | 2000-01-01  | 4  | None |
    |  5  | 2000-01-01  | 5  | None |
    |  6  | 2000-01-01  | 6  | None |
    |  7  | 2000-01-01  | 7  | None |
    |  8  | 2000-01-01  | 8  | None |
    |  9  | 2000-01-01  | 9  | None |
    | 10  | 2001-01-01  | 0  | None |
    | 11  | 2001-01-01  | 1  | 11   |
    | 12  | 2001-01-01  | 2  | None |
    | 13  | 2001-01-01  | 3  | None |
    | 14  | 2001-01-01  | 4  | None |
    | 15  | 2001-01-01  | 5  | None |
    | 16  | 2001-01-01  | 6  | None |
    | 17  | 2001-01-01  | 7  | None |
    | 18  | 2001-01-01  | 8  | None |
    | 19  | 2001-01-01  | 9  | None |
    +-----+-------------+----+------+

相关内容

  • 没有找到相关文章

最新更新