pyspark DF.show() 在 zipWithIndex 之后将 RDD 转换为 DF 后出错



我似乎正在遵循记录的方式,显示从具有模式的RDD转换的DF。但显然,我错过了一些次要但重要的点。如下:

# Original schema + Index for zipWithIndex with variations on this
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = result_df.rdd.zipWithIndex()
df = spark.createDataFrame(rdd, schema)
#df.select("*").show()
print(schema)

在执行操作之前,架构如下所示:

df:pyspark.sql.dataframe.DataFrame
ARRAY_COLS:array
element:string
index:long

图式:

StructType
(List(StructField
(ARRAY_COLS,ArrayType(StringType,true),false),
StructField(index,LongType,true)))

一旦我用 .show 执行操作,它就会爆炸。在这种情况下,我已经动态地做了一些事情,但并不是真的必要。

完整列表

from functools import reduce
from pyspark.sql.functions import lower, col, lit, concat, split
from pyspark.sql.types import * 
from pyspark.sql import Row
from pyspark.sql import functions as f
source_df = spark.createDataFrame(
[
(1, 11, 111),
(2, 22, 222)
],
["colA", "colB", "colC"]
)
intermediate_df = (reduce(
lambda df, col_name: df.withColumn(col_name, concat(lit(col_name), lit("_"), col(col_name))),
source_df.columns,
source_df
)     )
intermediate_df.show(truncate=False)
allCols = [x for x in intermediate_df.columns]
result_df = intermediate_df.select(f.concat_ws(',', *allCols).alias('CONCAT_COLS'))
result_df.show(truncate=False) 
result_df = result_df.select(split(col("CONCAT_COLS"), ",s*").alias("ARRAY_COLS"))
result_df.show(truncate=False) 
#######
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = result_df.rdd.zipWithIndex()
df = spark.createDataFrame(rdd, schema)
df.select("*").show() 
print(schema)

如果你看rdd,问题会变得更加清晰:

print(rdd.collect())
#[(Row(ARRAY_COLS=[u'colA_1', u'colB_11', u'colC_111']), 0),
# (Row(ARRAY_COLS=[u'colA_2', u'colB_22', u'colC_222']), 1)]

请注意,它是一个包含Row对象和索引的元组。

我看到两种选择:

1( 使用元组推导从Row中提取值,并将rddmap到元组匹配schema

rdd1 = rdd.map(
lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],)
)
df1 = spark.createDataFrame(rdd1, schema)
df1.show(truncate=False)
#+---------------------------+-----+
#|ARRAY_COLS                 |index|
#+---------------------------+-----+
#|[colA_1, colB_11, colC_111]|0    |
#|[colA_2, colB_22, colC_222]|1    |
#+---------------------------+-----+

这将创建一个包含每条记录内容的新tuple

print(rdd1.collect())
#[([u'colA_1', u'colB_11', u'colC_111'], 0),
# ([u'colA_2', u'colB_22', u'colC_222'], 1)]

2( 通过添加index和解压缩现有Row来构建新Row

rdd2 = rdd.map(lambda row: Row(index=0, **row[0].asDict()))
df2 = spark.createDataFrame(rdd2, schema)
df2.show(truncate=False)
#+---------------------------+-----+
#|ARRAY_COLS                 |index|
#+---------------------------+-----+
#|[colA_1, colB_11, colC_111]|0    |
#|[colA_2, colB_22, colC_222]|1    |
#+---------------------------+-----+

现在,每条记录都是一个添加了indexRow

print(rdd2.collect())
#[Row(ARRAY_COLS=[u'colA_1', u'colB_11', u'colC_111'], index=0),
# Row(ARRAY_COLS=[u'colA_2', u'colB_22', u'colC_222'], index=0)]

但正因为如此,你不需要在调用createDataFrame中使用schema

spark.createDataFrame(rdd2).show()
#+---------------------------+-----+
#|ARRAY_COLS                 |index|
#+---------------------------+-----+
#|[colA_1, colB_11, colC_111]|0    |
#|[colA_2, colB_22, colC_222]|1    |
#+---------------------------+-----+

方法1 使用您定义的现有schema,而方法 2 可能更紧凑,代码明智(但需要硬编码index=(。

最新更新