使用多种数据类型取消pyspark数据框架的旋转



假设我有以下spark数据帧(df):

from datetime import datetime
import pandas as pd
ts = [datetime(2022, 1, 1, 0), datetime(2022, 1, 1, 1), datetime(2022, 1, 1, 2)] 
pandas_df = pd.DataFrame({"Timestamp": ts, "column_a": ["a", "b", "c"], "column_b": [1.1, 2.2, 3.3], "column_c": [5, 6, 7]})
df = spark.createDataFrame(pandas_df)
df = df.withColumn("column_c", df.column_c.cast("int"))

就像这样:

Timestamp   column_a    column_b    column_c
0   2022-01-01 00:00:00 a   1.1 5
1   2022-01-01 01:00:00 b   2.2 6
2   2022-01-01 02:00:00 c   3.3 7

现在,我想使用pyspark来解开它,并得到以下结果:

Timestamp   Columns string  double  int
2022-01-01T02:00:00.000+0000    column_c    null    null    7
2022-01-01T02:00:00.000+0000    column_b    null    3.3 null
2022-01-01T02:00:00.000+0000    column_a    c   null    null
2022-01-01T01:00:00.000+0000    column_c    null    null    6
2022-01-01T01:00:00.000+0000    column_b    null    2.2 null
2022-01-01T01:00:00.000+0000    column_a    b   null    null
2022-01-01T00:00:00.000+0000    column_c    null    null    5
2022-01-01T00:00:00.000+0000    column_b    null    1.1 null
2022-01-01T00:00:00.000+0000    column_a    a   null    null

我能想到的一种方法是将每个类型分开并连接结果数据框:

import pyspark.sql.functions as func
final_df = None
for data_type in ["string", "double", "int"]:

dtype_cols = [item[0] for item in df.dtypes if item[1] == data_type]
stack_str = ",".join([f"'{item}', {item}" for item in dtype_cols])
expression = f"stack({len(dtype_cols)}, {stack_str}) as (Columns, {data_type})"
untransposed_df = df.select("Timestamp", func.expr(expression))
if not final_df:
final_df = untransposed_df
else: 
final_df = final_df.join(untransposed_df, ["Timestamp", "Columns"], how="outer") 

这看起来不是很有效,因为它涉及多个连接。有更好的方法吗?

谢谢!

时间充裕,完整答案如下:

no_timestamp_columns = [ field for field in df.schema.fields  if field.name != 'timestamp' ] # use list comprehension to pull out fields we will need.
df.select(
col("timestamp") ,
explode( #make many rows from this array
array( *[ # use list comprehension to build array.
struct( # use struct to group the data we want as columns together
lit(column.name).alias("column"), #for each existing column
*[ # create standard set of columns for all values nulling out the ones that aren't the name of the column we are working with and providing the value for the column we are working with.
(( lit(None) if value.name != column.name else col(column.name) ).cast(value.dataType) ).alias(str(value.dataType)) for value in no_timestamp_columns 
]  
) for column in no_timestamp_columns 
]) 
).alias("columns")
).select(
col("timestamp") ,
col("columns.*"),
).show()
+--------------------+------+----------+--------+
|           timestamp|column|StringType|LongType|
+--------------------+------+----------+--------+
|2022-01-28 23:32:...|  Col1|      str1|    null|
|2022-01-28 23:32:...|  Col2|      null|     100|
|2022-02-28 23:02:...|  Col1|      str2|    null|
|2022-02-28 23:02:...|  Col2|      null|     202|
|2022-02-28 17:22:...|  Col1|      str3|    null|
|2022-02-28 17:22:...|  Col2|      null|     102|
|2022-02-28 23:19:...|  Col1|      str4|    null|
|2022-02-28 23:19:...|  Col2|      null|     102|
|2022-03-29 17:32:...|  Col1|      str5|    null|
|2022-03-29 17:32:...|  Col2|      null|     102|
|2022-01-28 23:32:...|  Col1|      str6|    null|
|2022-01-28 23:32:...|  Col2|      null|     101|
|2022-02-28 17:28:...|  Col1|      str7|    null|
|2022-02-28 17:28:...|  Col2|      null|     201|
|2022-03-28 23:59:...|  Col1|      str8|    null|
|2022-03-28 23:59:...|  Col2|      null|     100|
|2022-02-28 21:02:...|  Col1|      str9|    null|
|2022-02-28 21:02:...|  Col2|      null|     100|
+--------------------+------+----------+--------+

如果你使用数组,你必须保持结构体的形式一致。我花了更长的时间来思考这个问题。您可以将所有列强制转换为stringType,然后在选择中确定如何强制转换,或者您可以采用我上面所做的方法,即保持原始类型,但以一致的方式填充空。

以下是大部分答案。诚然,我没有把它放在你要求的确切格式,但它仍然遵循精神,缺少一些空。您可能会进一步完善逻辑以创建您感兴趣的格式,但我不确定投资是否值得。你可以用更多的时间在struct中为列值创建更多的子构造函数,使用列名,使用columns列来选择你想要的值,但这给了你一个如何去做的一般想法。

from  pyspark.sql.functions import col, first, array, struct
>>> data = [
... ("2022-01-28 23:32:52.0","str1",100),
... ("2022-02-28 23:02:40.0","str2",202),
... ("2022-02-28 17:22:45.0","str3",102),
... ("2022-02-28 23:19:37.0","str4",102),
... ("2022-03-29 17:32:02.0","str5",102),
... ("2022-01-28 23:32:40.0","str6",101),
... ("2022-02-28 17:28:09.0","str7",201),
... ("2022-03-28 23:59:54.0","str8",100),
... ("2022-02-28 21:02:40.0","str9",100),
... ]
>>> 
>>> df = spark.createDataFrame(data)
>>> df = df.toDF("timestamp", "Col1", "Col2")
>>> df.show()
+--------------------+----+----+                                                
|           timestamp|Col1|Col2|
+--------------------+----+----+
|2022-01-28 23:32:...|str1| 100|
|2022-02-28 23:02:...|str2| 202|
|2022-02-28 17:22:...|str3| 102|
|2022-02-28 23:19:...|str4| 102|
|2022-03-29 17:32:...|str5| 102|
|2022-01-28 23:32:...|str6| 101|
|2022-02-28 17:28:...|str7| 201|
|2022-03-28 23:59:...|str8| 100|
|2022-02-28 21:02:...|str9| 100|
+--------------------+----+----+
df.select( 
col("timestamp") , 
array( #build array of column names
*[ lit(field.name) for field in df.schema.fields  if field.name != 'timestamp' ] 
).alias("columns"), 
struct( #build struct of inverted valued
*[col(field.name).alias(str(field.dataType)) for field in df.schema.fields if field.name != 'timestamp'] #use list comprehension to pass varargs in format we want
).alias("data") )
.select( 
col("timestamp"), 
explode(
col("columns")
),
col("data.*"))
.show()
+--------------------+----+----------+--------+
|           timestamp| col|StringType|LongType|
+--------------------+----+----------+--------+
|2022-01-28 23:32:...|Col1|      str1|     100|
|2022-01-28 23:32:...|Col2|      str1|     100|
|2022-02-28 23:02:...|Col1|      str2|     202|
|2022-02-28 23:02:...|Col2|      str2|     202|
|2022-02-28 17:22:...|Col1|      str3|     102|
|2022-02-28 17:22:...|Col2|      str3|     102|
|2022-02-28 23:19:...|Col1|      str4|     102|
|2022-02-28 23:19:...|Col2|      str4|     102|
|2022-03-29 17:32:...|Col1|      str5|     102|
|2022-03-29 17:32:...|Col2|      str5|     102|
|2022-01-28 23:32:...|Col1|      str6|     101|
|2022-01-28 23:32:...|Col2|      str6|     101|
|2022-02-28 17:28:...|Col1|      str7|     201|
|2022-02-28 17:28:...|Col2|      str7|     201|
|2022-03-28 23:59:...|Col1|      str8|     100|
|2022-03-28 23:59:...|Col2|      str8|     100|
|2022-02-28 21:02:...|Col1|      str9|     100|
|2022-02-28 21:02:...|Col2|      str9|     100|
+--------------------+----+----------+--------+

最新更新