PySpark - 根据列的类型将列分解为行



给定一个数据帧:

+---+-----------+---------+-------+------------+
| id|      score|tx_amount|isValid|    greeting|
+---+-----------+---------+-------+------------+
|  1|        0.2|    23.78|   true| hello_world|
|  2|        0.6|    12.41|  false|byebye_world|
+---+-----------+---------+-------+------------+

我想使用输入数据帧的类型将这些列分解为名为"col_value"的行。

df.dtypes
[('id', 'int'), ('model_score', 'double'), ('tx_amount', 'double'), ('isValid', 'boolean'), ('greeting', 'string')]

预期产出:

+---+------------+--------+---------+----------+-------+---------+ 
| id|   col_value|is_score|is_amount|is_boolean|is_text|col_name | 
+---+------------+--------+---------+----------+-------+---------+ 
|  1|         0.2|       Y|        N|         N|      N|score    | 
|  1|       23.78|       N|        Y|         N|      N|tx_amount| 
|  1|        true|       N|        N|         Y|      N|isValid  | 
|  1| hello_world|       N|        N|         N|      Y|greeting | 
|  2|         0.6|       Y|        N|         N|      N|score    | 
|  2|       12.41|       N|        Y|         N|      N|tx_amount| 
|  2|       false|       N|        N|         Y|      N|isValid  | 
|  2|byebye_world|       N|        N|         N|      Y|greeting | 
+---+------------+--------+---------+----------+-------+---------+ 

到目前为止,我拥有的:

df.withColumn("cols", F.explode(F.arrays_zip(F.array("score", "tx_amount", "isValid", "greeting")))) 
.select("id", F.col("cols.*")) 
...

但是当我尝试压缩列以在爆炸中使用时,它会给出有关类型的错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'array(`id`, `model_score`, `tx_amount`, `isValid`, `greeting`)' due to data type mismatch: input to function array should all be the same type, but it's [int, double, double, boolean, string]

当输入列的类型可能都不同时,我该如何执行此操作?

Sample DataFrame:

df.show()
df.printSchema()
+---+-----------+---------+-------+------------+
| id|model_score|tx_amount|isValid|    greeting|
+---+-----------+---------+-------+------------+
|  1|        0.2|    23.78|   true| hello_world|
|  2|        0.6|    12.41|  false|byebye_world|
+---+-----------+---------+-------+------------+
root
|-- id: integer (nullable = true)
|-- model_score: double (nullable = true)
|-- tx_amount: double (nullable = true)
|-- isValid: boolean (nullable = true)
|-- greeting: string (nullable = true)

我试图为任何列输入保持动态。它将从df.dtypes[1:]中获取类型,因为id不包含在col_value这就是skipping it(1:)的原因。Array只接受其中same type,这就是为什么我们在应用逻辑之前将所有col 转换为字符串。我认为它应该适用于您的用例。您可以从这里构建您的Y/N列。

df.select([F.col(c).cast("string") for c in df.columns])
.withColumn("cols", F.explode(F.arrays_zip(F.array([F.array(x[0],F.lit(x[1]),F.lit(x[0]))
for x in df.dtypes[1:]]))))
.select("id", F.col("cols.*")).withColumn("col_value", F.element_at("0",1))
.withColumn("col_type", F.element_at("0",2))
.withColumn("col_name", F.element_at("0",3)).drop("0").show()
+---+------------+--------+-----------+
| id|   col_value|col_type|   col_name|
+---+------------+--------+-----------+
|  1|         0.2|  double|model_score|
|  1|       23.78|  double|  tx_amount|
|  1|        true| boolean|    isValid|
|  1| hello_world|  string|   greeting|
|  2|         0.6|  double|model_score|
|  2|       12.41|  double|  tx_amount|
|  2|       false| boolean|    isValid|
|  2|byebye_world|  string|   greeting|
+---+------------+--------+-----------+

你可以尝试几种联合:


df = df.select(
"id",
F.col("score").cast("string").alias("col_value"),
F.lit("Y").alias("is_score"),
F.lit("N").alias("is_amount"),
F.lit("N").alias("is_boolean"),
F.lit("N").alias("is_text"),
).union(df.select(
"id",
F.col("tx_amount").cast("string").alias("col_value"),
F.lit("N").alias("is_score"),
F.lit("Y").alias("is_amount"),
F.lit("N").alias("is_boolean"),
F.lit("N").alias("is_text"),
)).union(...) # etc

最新更新