假设我有一个如下格式的pandas数据帧,我已经将其转换为字符串,因为我不想为其定义模式,以便能够转换为pyspark-df。因此,我转换数据帧如下:
train_pd = X_train_f.astype('string')
train_pd.info(verbose=True)
# Column Dtype
--- ------ -----
0 col1 string
1 col2 string
2 col3 string
3 col4 string
当我现在运行以下代码时,我会收到以下错误消息。
training = spark.createDataFrame(train_pd)
类型错误:字段col15:无法合并类型<类"pyspark.sql.types.StructType">并且<类"pyspark.sql.types.StringType">
为什么,我认为通过将所有内容转换为字符串,我可以绕过模式推断。
样本数据
col0 col1 col2 col3 col4 col5 col6 col7 col8 col9 col10 col11 col12 col13 col14 col15 col16 col17 col18 col19 col20 col21 col22 col23 col24 col25 col26 col27 col28 col29 col30 col31 col32 col33 col34 col35 col36 col37 col38 col39 ... col355 col356 col357 col358 col359 col360 col361 col362 col363 col364 col365 col366 col367 col368 col369 col370 col371 col372 col373 col374 col375 col376 col377 col378 col379 col380 col381 col382 col383 col384 col385 col386 col387 col388 col389 col390 col391 col392 col393 col394
DUMMY DUMMY DUMMY DUMMY DUMMY DUMMY DUMMY DUMMY 1144418 0 1908 0 DUMMY DUMMY 50 <NA> <NA> 0 0 0001 DUMMY 2021-11-03 16:51:25 2021-11-03 17:23:13 04 <NA> <NA> <NA> DUMMY DUMMY DUMMY DUMMY DUMMY 7 DUMMY <NA> DUMMY DUMMY <NA> 30 4315 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0
在运行training = pandas_to_spark(train_pd)
:之前,从将Pandas数据帧转换为Spark数据帧错误运行此脚本
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return TimestampType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)