假设我有以下spark数据帧(df):
from datetime import datetime
import pandas as pd
ts = [datetime(2022, 1, 1, 0), datetime(2022, 1, 1, 1), datetime(2022, 1, 1, 2)]
pandas_df = pd.DataFrame({"Timestamp": ts, "column_a": ["a", "b", "c"], "column_b": [1.1, 2.2, 3.3], "column_c": [5, 6, 7]})
df = spark.createDataFrame(pandas_df)
df = df.withColumn("column_c", df.column_c.cast("int"))
就像这样:
Timestamp column_a column_b column_c
0 2022-01-01 00:00:00 a 1.1 5
1 2022-01-01 01:00:00 b 2.2 6
2 2022-01-01 02:00:00 c 3.3 7
现在,我想使用pyspark来解开它,并得到以下结果:
Timestamp Columns string double int
2022-01-01T02:00:00.000+0000 column_c null null 7
2022-01-01T02:00:00.000+0000 column_b null 3.3 null
2022-01-01T02:00:00.000+0000 column_a c null null
2022-01-01T01:00:00.000+0000 column_c null null 6
2022-01-01T01:00:00.000+0000 column_b null 2.2 null
2022-01-01T01:00:00.000+0000 column_a b null null
2022-01-01T00:00:00.000+0000 column_c null null 5
2022-01-01T00:00:00.000+0000 column_b null 1.1 null
2022-01-01T00:00:00.000+0000 column_a a null null
我能想到的一种方法是将每个类型分开并连接结果数据框:
import pyspark.sql.functions as func
final_df = None
for data_type in ["string", "double", "int"]:
dtype_cols = [item[0] for item in df.dtypes if item[1] == data_type]
stack_str = ",".join([f"'{item}', {item}" for item in dtype_cols])
expression = f"stack({len(dtype_cols)}, {stack_str}) as (Columns, {data_type})"
untransposed_df = df.select("Timestamp", func.expr(expression))
if not final_df:
final_df = untransposed_df
else:
final_df = final_df.join(untransposed_df, ["Timestamp", "Columns"], how="outer")
这看起来不是很有效,因为它涉及多个连接。有更好的方法吗?
谢谢!
时间充裕,完整答案如下:
no_timestamp_columns = [ field for field in df.schema.fields if field.name != 'timestamp' ] # use list comprehension to pull out fields we will need.
df.select(
col("timestamp") ,
explode( #make many rows from this array
array( *[ # use list comprehension to build array.
struct( # use struct to group the data we want as columns together
lit(column.name).alias("column"), #for each existing column
*[ # create standard set of columns for all values nulling out the ones that aren't the name of the column we are working with and providing the value for the column we are working with.
(( lit(None) if value.name != column.name else col(column.name) ).cast(value.dataType) ).alias(str(value.dataType)) for value in no_timestamp_columns
]
) for column in no_timestamp_columns
])
).alias("columns")
).select(
col("timestamp") ,
col("columns.*"),
).show()
+--------------------+------+----------+--------+
| timestamp|column|StringType|LongType|
+--------------------+------+----------+--------+
|2022-01-28 23:32:...| Col1| str1| null|
|2022-01-28 23:32:...| Col2| null| 100|
|2022-02-28 23:02:...| Col1| str2| null|
|2022-02-28 23:02:...| Col2| null| 202|
|2022-02-28 17:22:...| Col1| str3| null|
|2022-02-28 17:22:...| Col2| null| 102|
|2022-02-28 23:19:...| Col1| str4| null|
|2022-02-28 23:19:...| Col2| null| 102|
|2022-03-29 17:32:...| Col1| str5| null|
|2022-03-29 17:32:...| Col2| null| 102|
|2022-01-28 23:32:...| Col1| str6| null|
|2022-01-28 23:32:...| Col2| null| 101|
|2022-02-28 17:28:...| Col1| str7| null|
|2022-02-28 17:28:...| Col2| null| 201|
|2022-03-28 23:59:...| Col1| str8| null|
|2022-03-28 23:59:...| Col2| null| 100|
|2022-02-28 21:02:...| Col1| str9| null|
|2022-02-28 21:02:...| Col2| null| 100|
+--------------------+------+----------+--------+
如果你使用数组,你必须保持结构体的形式一致。我花了更长的时间来思考这个问题。您可以将所有列强制转换为stringType,然后在选择中确定如何强制转换,或者您可以采用我上面所做的方法,即保持原始类型,但以一致的方式填充空。
以下是大部分答案。诚然,我没有把它放在你要求的确切格式,但它仍然遵循精神,缺少一些空。您可能会进一步完善逻辑以创建您感兴趣的格式,但我不确定投资是否值得。你可以用更多的时间在struct中为列值创建更多的子构造函数,使用列名,使用columns
列来选择你想要的值,但这给了你一个如何去做的一般想法。
from pyspark.sql.functions import col, first, array, struct
>>> data = [
... ("2022-01-28 23:32:52.0","str1",100),
... ("2022-02-28 23:02:40.0","str2",202),
... ("2022-02-28 17:22:45.0","str3",102),
... ("2022-02-28 23:19:37.0","str4",102),
... ("2022-03-29 17:32:02.0","str5",102),
... ("2022-01-28 23:32:40.0","str6",101),
... ("2022-02-28 17:28:09.0","str7",201),
... ("2022-03-28 23:59:54.0","str8",100),
... ("2022-02-28 21:02:40.0","str9",100),
... ]
>>>
>>> df = spark.createDataFrame(data)
>>> df = df.toDF("timestamp", "Col1", "Col2")
>>> df.show()
+--------------------+----+----+
| timestamp|Col1|Col2|
+--------------------+----+----+
|2022-01-28 23:32:...|str1| 100|
|2022-02-28 23:02:...|str2| 202|
|2022-02-28 17:22:...|str3| 102|
|2022-02-28 23:19:...|str4| 102|
|2022-03-29 17:32:...|str5| 102|
|2022-01-28 23:32:...|str6| 101|
|2022-02-28 17:28:...|str7| 201|
|2022-03-28 23:59:...|str8| 100|
|2022-02-28 21:02:...|str9| 100|
+--------------------+----+----+
df.select(
col("timestamp") ,
array( #build array of column names
*[ lit(field.name) for field in df.schema.fields if field.name != 'timestamp' ]
).alias("columns"),
struct( #build struct of inverted valued
*[col(field.name).alias(str(field.dataType)) for field in df.schema.fields if field.name != 'timestamp'] #use list comprehension to pass varargs in format we want
).alias("data") )
.select(
col("timestamp"),
explode(
col("columns")
),
col("data.*"))
.show()
+--------------------+----+----------+--------+
| timestamp| col|StringType|LongType|
+--------------------+----+----------+--------+
|2022-01-28 23:32:...|Col1| str1| 100|
|2022-01-28 23:32:...|Col2| str1| 100|
|2022-02-28 23:02:...|Col1| str2| 202|
|2022-02-28 23:02:...|Col2| str2| 202|
|2022-02-28 17:22:...|Col1| str3| 102|
|2022-02-28 17:22:...|Col2| str3| 102|
|2022-02-28 23:19:...|Col1| str4| 102|
|2022-02-28 23:19:...|Col2| str4| 102|
|2022-03-29 17:32:...|Col1| str5| 102|
|2022-03-29 17:32:...|Col2| str5| 102|
|2022-01-28 23:32:...|Col1| str6| 101|
|2022-01-28 23:32:...|Col2| str6| 101|
|2022-02-28 17:28:...|Col1| str7| 201|
|2022-02-28 17:28:...|Col2| str7| 201|
|2022-03-28 23:59:...|Col1| str8| 100|
|2022-03-28 23:59:...|Col2| str8| 100|
|2022-02-28 21:02:...|Col1| str9| 100|
|2022-02-28 21:02:...|Col2| str9| 100|
+--------------------+----+----------+--------+