许多列上的Groupby和pivot PySpark数据帧



我是Spark的新手,希望在多列上透视PySpark数据帧。每个不同的(日期、级别(组合都有一行。行应该被展平,这样每个唯一的日期就有一行。

import pyspark.sql.functions as F
from datetime import datetime
data= [(datetime(2021,8,4,13,0), 1, 22, "a"),(datetime(2021,8,4,13,0), 2, 14, "a"),(datetime(2021,8,4,13,0), 3, 9, "a"),(datetime(2021,8,4,13,0), 4, 7, "a"),
(datetime(2021,8,4,14,0), 1, 16, "b"),(datetime(2021,8,4,14,0), 2, 21, "b"),(datetime(2021,8,4,14,0), 3, 17, "b"),(datetime(2021,8,4,14,0), 4, 18, "b"),
(datetime(2021,8,4,15,0), 1, 19, "a"),(datetime(2021,8,4,15,0), 2, 9, "b"),(datetime(2021,8,4,15,0), 3, 10, "c"),(datetime(2021,8,4,15,0), 4, 13, "d")
]

columns= ["date","rank","feat1","feat2"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)
+-------------------+----+-----+-----+
|date               |rank|feat1|feat2|
+-------------------+----+-----+-----+
|2021-08-04 13:00:00|1   |22   |a    |
|2021-08-04 13:00:00|2   |14   |a    |
|2021-08-04 13:00:00|3   |9    |a    |
|2021-08-04 13:00:00|4   |7    |a    |
|2021-08-04 14:00:00|1   |16   |b    |
|2021-08-04 14:00:00|2   |21   |b    |
|2021-08-04 14:00:00|3   |17   |b    |
|2021-08-04 14:00:00|4   |18   |b    |
|2021-08-04 15:00:00|1   |19   |a    |
|2021-08-04 15:00:00|2   |9    |b    |
|2021-08-04 15:00:00|3   |10   |c    |
|2021-08-04 15:00:00|4   |13   |d    |
+-------------------+----+-----+-----+

真实数据有30多个特征列,每个日期的排名从1到100。所需输出:

+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|               date|rank1_feat1|rank2_feat1|rank3_feat1|rank4_feat1|rank1_feat2|rank2_feat2|rank3_feat2|rank4_feat2|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|2021-08-04 15:00:00|         19|          9|         10|         13|          a|          b|          c|          d|
|2021-08-04 13:00:00|         22|         14|          9|          7|          a|          a|          a|          a|
|2021-08-04 14:00:00|         16|         21|         17|         18|          b|          b|          b|          b|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+

我有一个解决方案,它似乎适用于我的琐碎示例,但内存使用率非常高,以至于我甚至不能使用1/500的数据而不出现内存错误。

dfspine = df.select("date").distinct()
for col in df.columns:
if col not in ["date", "rank"]:
piv = df.groupby("date").pivot("rank").agg(F.first(col))
mapping = dict([(pivcol,"rank%s_%s" % (pivcol, col)) for pivcol in piv.columns if pivcol not in ["date"]])
piv = piv.select([F.col(c).alias(mapping.get(c, c)) for c in piv.columns])
dfspine = dfspine.join(piv, how="left", on="date")

在panda中,这是相当琐碎的。这是一个融化和旋转。在pyspark中,由于缺少熔体,这有点困难。幸运的是,这篇文章有一个在pyspark中重新创建熔体的解决方案。我们可以以此为基础采取行动。

Pandas版本:

import pandas as pd
pdf = pd.DataFrame({'date': ['2021-08-04 13:00:00',
'2021-08-04 13:00:00',
'2021-08-04 13:00:00',
'2021-08-04 13:00:00',
'2021-08-04 14:00:00',
'2021-08-04 14:00:00',
'2021-08-04 14:00:00',
'2021-08-04 14:00:00',
'2021-08-04 15:00:00',
'2021-08-04 15:00:00',
'2021-08-04 15:00:00',
'2021-08-04 15:00:00'],
'rank': [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4],
'feat1': [22, 14, 9, 7, 16, 21, 17, 18, 19, 9, 10, 13],
'feat2': ['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'a', 'b', 'c', 'd']})
pdf = pdf.melt(id_vars=['date', 'rank'])
pdf['col'] = 'rank' + pdf['rank'].astype(str) + '_' + pdf['variable']
pdf.pivot(index='date',columns='col',values='value')

pyspark版本:

import pandas as pd
from pyspark.sql.functions import array, col, explode, lit, struct, concat, first
from pyspark.sql import DataFrame
from typing import Iterable 
def melt(
df: DataFrame, 
id_vars: Iterable[str], value_vars: Iterable[str], 
var_name: str="variable", value_name: str="value") -> DataFrame:
"""Convert :class:`DataFrame` from wide to long format."""
# Create array<struct<variable: str, value: ...>>
_vars_and_vals = array(*(
struct(lit(c).alias(var_name), col(c).alias(value_name)) 
for c in value_vars))
# Add to the DataFrame and explode
_tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
cols = id_vars + [
col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
return _tmp.select(*cols)
pdf = pd.DataFrame({'date': ['2021-08-04 13:00:00',
'2021-08-04 13:00:00',
'2021-08-04 13:00:00',
'2021-08-04 13:00:00',
'2021-08-04 14:00:00',
'2021-08-04 14:00:00',
'2021-08-04 14:00:00',
'2021-08-04 14:00:00',
'2021-08-04 15:00:00',
'2021-08-04 15:00:00',
'2021-08-04 15:00:00',
'2021-08-04 15:00:00'],
'rank': [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4],
'feat1': [22, 14, 9, 7, 16, 21, 17, 18, 19, 9, 10, 13],
'feat2': ['a', 'a', 'a', 'a', 'b', 'b', 'b', 'b', 'a', 'b', 'c', 'd']})
sdf = spark.createDataFrame(pdf)
sdf = melt(sdf, id_vars=['date', 'rank'], value_vars=['feat1','feat2'])
sdf = sdf.withColumn('rank', concat(lit('rank'),col('rank'), lit('_'),col('variable')))
sdf = sdf.groupby('date').pivot('rank').agg(first(col('value')))

输出

+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|               date|rank1_feat1|rank1_feat2|rank2_feat1|rank2_feat2|rank3_feat1|rank3_feat2|rank4_feat1|rank4_feat2|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|2021-08-04 13:00:00|         22|          a|         14|          a|          9|          a|          7|          a|
|2021-08-04 14:00:00|         16|          b|         21|          b|         17|          b|         18|          b|
|2021-08-04 15:00:00|         19|          a|          9|          b|         10|          c|         13|          d|
+-------------------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+

最新更新