如何融化 Spark 数据帧



在PySpark或至少在Scala中,Apache Spark中是否有等效的Pandas Melt函数?

到目前为止,我一直在Python中运行一个示例数据集,现在我想将Spark用于整个数据集。

Spark>= 3.4

在 Spark 3.4 或更高版本中,您可以使用内置melt方法

(sdf
    .melt(
        ids=['A'], values=['B', 'C'], 
        variableColumnName="variable", 
        valueColumnName="value")
    .show())
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

此方法在所有 API 中都可用,因此可以在 Scala 中使用。

sdf.melt(Array($"A"), Array($"B", $"C"), "variable", "value")

或 SQL

SELECT * FROM sdf UNPIVOT (val FOR col in (col_1, col_2))

Spark 3.2(仅限Python,需要Pandas和pyarrow)

(sdf
    .to_koalas()
    .melt(id_vars=['A'], value_vars=['B', 'C'])
    .to_spark()
    .show())
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

火花<3.2

没有内置函数(如果您启用了SQL和Hive支持,则可以使用该函数stack但它不会在Spark中公开并且没有本机实现),但是创建自己的函数是微不足道的。所需导入:

from pyspark.sql.functions import array, col, explode, lit, struct
from pyspark.sql import DataFrame
from typing import Iterable 

实现示例:

def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""
    # Create array<struct<variable: str, value: ...>>
    _vars_and_vals = array(*(
        struct(lit(c).alias(var_name), col(c).alias(value_name)) 
        for c in value_vars))
    # Add to the DataFrame and explode
    _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
    cols = id_vars + [
            col("_vars_and_vals")[x].alias(x) for x in [var_name, value_name]]
    return _tmp.select(*cols)

还有一些测试(基于熊猫文档测试):

import pandas as pd
pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6}})
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C'])
   A variable  value
0  a        B      1
1  b        B      3
2  c        B      5
3  a        C      2
4  b        C      4
5  c        C      6
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|    1|
|  a|       C|    2|
|  b|       B|    3|
|  b|       C|    4|
|  c|       B|    5|
|  c|       C|    6|
+---+--------+-----+

注意:要与旧版 Python 一起使用,请删除类型注释。

相关:

  • R SparkR - 相当于熔化功能
  • 聚集在闪闪发光的

在我为 Scala 寻找 Spark 中melt的实现时遇到了这个问题。

发布我的 Scala 端口,以防有人也偶然发现这个。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame}
/** Extends the [[org.apache.spark.sql.DataFrame]] class
 *
 *  @param df the data frame to melt
 */
implicit class DataFrameFunctions(df: DataFrame) {
    /** Convert [[org.apache.spark.sql.DataFrame]] from wide to long format.
     * 
     *  melt is (kind of) the inverse of pivot
     *  melt is currently (02/2017) not implemented in spark
     *
     *  @see reshape packe in R (https://cran.r-project.org/web/packages/reshape/index.html)
     *  @see this is a scala adaptation of http://stackoverflow.com/questions/41670103/pandas-melt-function-in-apache-spark
     *  
     *  @todo method overloading for simple calling
     *
     *  @param id_vars the columns to preserve
     *  @param value_vars the columns to melt
     *  @param var_name the name for the column holding the melted columns names
     *  @param value_name the name for the column holding the values of the melted columns
     *
     */
    def melt(
            id_vars: Seq[String], value_vars: Seq[String], 
            var_name: String = "variable", value_name: String = "value") : DataFrame = {
        // Create array<struct<variable: str, value: ...>>
        val _vars_and_vals = array((for (c <- value_vars) yield { struct(lit(c).alias(var_name), col(c).alias(value_name)) }): _*)
        // Add to the DataFrame and explode
        val _tmp = df.withColumn("_vars_and_vals", explode(_vars_and_vals))
        val cols = id_vars.map(col _) ++ { for (x <- List(var_name, value_name)) yield { col("_vars_and_vals")(x).alias(x) }}
        return _tmp.select(cols: _*)
    }
}

考虑到Scala,我不是那么先进,我相信还有改进的余地。

欢迎任何意见。

投票支持user6910411的答案。它按预期工作,但是,它不能很好地处理 None 值。因此,我将他的熔体函数重构为以下内容:

from pyspark.sql.functions import array, col, explode, lit
from pyspark.sql.functions import create_map
from pyspark.sql import DataFrame
from typing import Iterable 
from itertools import chain
def melt(
        df: DataFrame, 
        id_vars: Iterable[str], value_vars: Iterable[str], 
        var_name: str="variable", value_name: str="value") -> DataFrame:
    """Convert :class:`DataFrame` from wide to long format."""
    # Create map<key: value>
    _vars_and_vals = create_map(
        list(chain.from_iterable([
            [lit(c), col(c)] for c in value_vars]
        ))
    )
    _tmp = df.select(*id_vars, explode(_vars_and_vals)) 
        .withColumnRenamed('key', var_name) 
        .withColumnRenamed('value', value_name)
    return _tmp

使用以下数据帧进行测试:

import pandas as pd
pdf = pd.DataFrame({'A': {0: 'a', 1: 'b', 2: 'c'},
                   'B': {0: 1, 1: 3, 2: 5},
                   'C': {0: 2, 1: 4, 2: 6},
                   'D': {1: 7, 2: 9}})
pd.melt(pdf, id_vars=['A'], value_vars=['B', 'C', 'D'])
A   variable    value
0   a   B   1.0
1   b   B   3.0
2   c   B   5.0
3   a   C   2.0
4   b   C   4.0
5   c   C   6.0
6   a   D   NaN
7   b   D   7.0
8   c   D   9.0
sdf = spark.createDataFrame(pdf)
melt(sdf, id_vars=['A'], value_vars=['B', 'C', 'D']).show()
+---+--------+-----+
|  A|variable|value|
+---+--------+-----+
|  a|       B|  1.0|
|  a|       C|  2.0|
|  a|       D|  NaN|
|  b|       B|  3.0|
|  b|       C|  4.0|
|  b|       D|  7.0|
|  c|       B|  5.0|
|  c|       C|  6.0|
|  c|       D|  9.0|
+---+--------+-----+

UPD

最后,我找到了最有效的实施方式。它使用我的纱线配置中集群的所有资源。

from pyspark.sql.functions import explode
def melt(df):
    sp = df.columns[1:]
    return (df
            .rdd
            .map(lambda x: [str(x[0]), [(str(i[0]), 
                                         float(i[1] if i[1] else 0)) for i in zip(sp, x[1:])]], 
                 preservesPartitioning = True)
            .toDF()
            .withColumn('_2', explode('_2'))
            .rdd.map(lambda x: [str(x[0]), 
                                str(x[1][0]), 
                                float(x[1][1] if x[1][1] else 0)], 
                     preservesPartitioning = True)
            .toDF()
            )

对于非常宽的数据帧,我从 user6910411 答案生成_vars_and_vals性能下降。

通过 selectExpr 实现熔化很有用

columns=['a', 'b', 'c', 'd', 'e', 'f']
pd_df = pd.DataFrame([[1,2,3,4,5,6], [4,5,6,7,9,8], [7,8,9,1,2,4], [8,3,9,8,7,4]], columns=columns)
df = spark.createDataFrame(pd_df)
+---+---+---+---+---+---+
|  a|  b|  c|  d|  e|  f|
+---+---+---+---+---+---+
|  1|  2|  3|  4|  5|  6|
|  4|  5|  6|  7|  9|  8|
|  7|  8|  9|  1|  2|  4|
|  8|  3|  9|  8|  7|  4|
+---+---+---+---+---+---+
cols = df.columns[1:]
df.selectExpr('a', "stack({}, {})".format(len(cols), ', '.join(("'{}', {}".format(i, i) for i in cols))))
+---+----+----+
|  a|col0|col1|
+---+----+----+
|  1|   b|   2|
|  1|   c|   3|
|  1|   d|   4|
|  1|   e|   5|
|  1|   f|   6|
|  4|   b|   5|
|  4|   c|   6|
|  4|   d|   7|
|  4|   e|   9|
|  4|   f|   8|
|  7|   b|   8|
|  7|   c|   9|
...

使用列表推导创建列名和列值的结构列,并使用魔术内联分解新列。下面的代码;

    melted_df=(df.withColumn(
                 #Create struct of column names and corresponding values
                'tab',F.array(*[F.struct(lit(x).alias('var'),F.col(x).alias('val'))for x in df.columns if x!='A'] ))
                 #Explode the column
                 .selectExpr('A',"inline(tab)")
          
)
  
melted_df.show()
+---+---+---+
|  A|var|val|
+---+---+---+
|  a|  B|  1|
|  a|  C|  2|
|  b|  B|  3|
|  b|  C|  4|
|  c|  B|  5|
|  c|  C|  6|
+---+---+---+

1) Copy & paste
2) 更改前 2 个变量

to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']
melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)

如果某些值包含 null,则创建null。要删除它,请添加以下内容:

.filter(f"!{new_names[1]} is null")
<小时 />

全面测试:

from pyspark.sql import functions as F
df = spark.createDataFrame([(101, "A", "Σ", "西"), (102, "B", "Ω", "诶")], ['ID', 'latin', 'greek', 'chinese'])
df.show()
# +---+-----+-----+-------+
# | ID|latin|greek|chinese|
# +---+-----+-----+-------+
# |101|    A|    Σ|     西|
# |102|    B|    Ω|     诶|
# +---+-----+-----+-------+
to_melt = {'latin', 'greek', 'chinese'}
new_names = ['lang', 'letter']
melt_str = ','.join([f"'{c}', `{c}`" for c in to_melt])
df = df.select(
    *(set(df.columns) - to_melt),
    F.expr(f"stack({len(to_melt)}, {melt_str}) ({','.join(new_names)})")
)
df.show()
# +---+-------+------+
# | ID|   lang|letter|
# +---+-------+------+
# |101|  latin|     A|
# |101|  greek|     Σ|
# |101|chinese|    西|
# |102|  latin|     B|
# |102|  greek|     Ω|
# |102|chinese|    诶|
# +---+-------+------+

相关内容

  • 没有找到相关文章

最新更新