我想从python字典中创建一个pyspark数据框,但是下面的代码
from pyspark.sql import SparkSession, Row
df_stable = spark.createDataFrame(dict_stable_feature)
df_stable.show()
显示此错误
TypeError: Can not infer schema for type: <class 'str'>
阅读stackoverflow:
Pyspark:由于数据类型str而不是StringType,无法将RDD转换为DataFrame
我可以推断,也许问题是我错误地使用python标准str而不是StringType和spark不喜欢它。我该怎么做才能让它起作用?
编辑:
我使用下面的代码创建了我的字典
创建多个列表并将它们存储到字典Python
如你所见,键是通过 创建的cc = str(col)
vv = "_" + str(value)
cv = cc + vv
dict_stable_feature[cv] = t
而t
只是1
和0
的二进制列表。
让我们从将python字典转换为位于正确位置(即初始化spark数据框所需的数据结构之一)的列表值开始。
假设字典中所有列表值的长度相同,您可以尝试以下操作:
column_names = []
dataset = None
for column_name in dict_stable_feature:
column_names.append(column_name)
column_values = dict_stable_feature[column_name]
# initialize dataset ranges
if dataset is None:
dataset=[]
for i in range(0,len(column_values)):
dataset.append([column_values[i]])
else:
for ind,val in enumerate(column_values):
dataset[ind].append(val)
my_df = sparkSession.createDataFrame(dataset,schema=column_names)
如果所有列表值的长度不相同,那么您可以尝试以下操作:
max_list_length = max([len(dict_stable_feature[k]) for k in dict_stable_feature])
column_names = []
dataset = [[] for i in range(0,max_list_length)]
default_data_value = None # feel free to change
for column_name in dict_stable_feature:
column_names.append(column_name)
column_values = dict_stable_feature[column_name]
for ind,val in enumerate(column_values):
dataset[ind].append(val)
# ensure all columns have the same amount of rows
no_of_values = len(column_values)
if no_of_values < max_list_length:
for i in range(no_of_values,max_list_length):
dataset[i].append(default_data_value)
my_df = sparkSession.createDataFrame(dataset,schema=column_names)
让我知道这是否适合你。