我似乎正在遵循记录的方式,显示从具有模式的RDD转换的DF。但显然,我错过了一些次要但重要的点。如下:
# Original schema + Index for zipWithIndex with variations on this
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = result_df.rdd.zipWithIndex()
df = spark.createDataFrame(rdd, schema)
#df.select("*").show()
print(schema)
在执行操作之前,架构如下所示:
df:pyspark.sql.dataframe.DataFrame
ARRAY_COLS:array
element:string
index:long
图式:
StructType
(List(StructField
(ARRAY_COLS,ArrayType(StringType,true),false),
StructField(index,LongType,true)))
一旦我用 .show 执行操作,它就会爆炸。在这种情况下,我已经动态地做了一些事情,但并不是真的必要。
完整列表
from functools import reduce
from pyspark.sql.functions import lower, col, lit, concat, split
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import functions as f
source_df = spark.createDataFrame(
[
(1, 11, 111),
(2, 22, 222)
],
["colA", "colB", "colC"]
)
intermediate_df = (reduce(
lambda df, col_name: df.withColumn(col_name, concat(lit(col_name), lit("_"), col(col_name))),
source_df.columns,
source_df
) )
intermediate_df.show(truncate=False)
allCols = [x for x in intermediate_df.columns]
result_df = intermediate_df.select(f.concat_ws(',', *allCols).alias('CONCAT_COLS'))
result_df.show(truncate=False)
result_df = result_df.select(split(col("CONCAT_COLS"), ",s*").alias("ARRAY_COLS"))
result_df.show(truncate=False)
#######
schema = StructType(result_df.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = result_df.rdd.zipWithIndex()
df = spark.createDataFrame(rdd, schema)
df.select("*").show()
print(schema)
如果你看rdd
,问题会变得更加清晰:
print(rdd.collect())
#[(Row(ARRAY_COLS=[u'colA_1', u'colB_11', u'colC_111']), 0),
# (Row(ARRAY_COLS=[u'colA_2', u'colB_22', u'colC_222']), 1)]
请注意,它是一个包含Row
对象和索引的元组。
我看到两种选择:
1( 使用元组推导从Row
中提取值,并将rdd
map
到元组匹配schema
:
rdd1 = rdd.map(
lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],)
)
df1 = spark.createDataFrame(rdd1, schema)
df1.show(truncate=False)
#+---------------------------+-----+
#|ARRAY_COLS |index|
#+---------------------------+-----+
#|[colA_1, colB_11, colC_111]|0 |
#|[colA_2, colB_22, colC_222]|1 |
#+---------------------------+-----+
这将创建一个包含每条记录内容的新tuple
:
print(rdd1.collect())
#[([u'colA_1', u'colB_11', u'colC_111'], 0),
# ([u'colA_2', u'colB_22', u'colC_222'], 1)]
2( 通过添加index
和解压缩现有Row
来构建新Row
:
rdd2 = rdd.map(lambda row: Row(index=0, **row[0].asDict()))
df2 = spark.createDataFrame(rdd2, schema)
df2.show(truncate=False)
#+---------------------------+-----+
#|ARRAY_COLS |index|
#+---------------------------+-----+
#|[colA_1, colB_11, colC_111]|0 |
#|[colA_2, colB_22, colC_222]|1 |
#+---------------------------+-----+
现在,每条记录都是一个添加了index
的Row
:
print(rdd2.collect())
#[Row(ARRAY_COLS=[u'colA_1', u'colB_11', u'colC_111'], index=0),
# Row(ARRAY_COLS=[u'colA_2', u'colB_22', u'colC_222'], index=0)]
但正因为如此,你不需要在调用createDataFrame
中使用schema
:
spark.createDataFrame(rdd2).show()
#+---------------------------+-----+
#|ARRAY_COLS |index|
#+---------------------------+-----+
#|[colA_1, colB_11, colC_111]|0 |
#|[colA_2, colB_22, colC_222]|1 |
#+---------------------------+-----+
方法1 使用您定义的现有schema
,而方法 2 可能更紧凑,代码明智(但需要硬编码index=
(。