从python字典创建pyspark数据框



我想从python字典中创建一个pyspark数据框,但是下面的代码

from pyspark.sql import SparkSession, Row
df_stable = spark.createDataFrame(dict_stable_feature)
df_stable.show()

显示此错误

TypeError: Can not infer schema for type: <class 'str'>

阅读stackoverflow:

Pyspark:由于数据类型str而不是StringType,无法将RDD转换为DataFrame

我可以推断,也许问题是我错误地使用python标准str而不是StringType和spark不喜欢它。我该怎么做才能让它起作用?

编辑:

我使用下面的代码创建了我的字典

创建多个列表并将它们存储到字典Python

如你所见,键是通过 创建的
cc = str(col)
vv = "_" + str(value)
cv = cc + vv
dict_stable_feature[cv] = t

t只是10的二进制列表。

让我们从将python字典转换为位于正确位置(即初始化spark数据框所需的数据结构之一)的列表值开始。

假设字典中所有列表值的长度相同,您可以尝试以下操作:

column_names = []
dataset = None
for column_name in dict_stable_feature:
column_names.append(column_name)
column_values = dict_stable_feature[column_name]
# initialize dataset ranges
if dataset is None:
dataset=[]

for i in range(0,len(column_values)):
dataset.append([column_values[i]])
else:
for ind,val in enumerate(column_values):
dataset[ind].append(val)
my_df = sparkSession.createDataFrame(dataset,schema=column_names)

如果所有列表值的长度不相同,那么您可以尝试以下操作:

max_list_length = max([len(dict_stable_feature[k]) for k in dict_stable_feature])
column_names = []
dataset = [[] for i in range(0,max_list_length)]
default_data_value = None # feel free to change
for column_name in dict_stable_feature:
column_names.append(column_name)
column_values = dict_stable_feature[column_name]

for ind,val in enumerate(column_values):
dataset[ind].append(val)
# ensure all columns have the same amount of rows
no_of_values = len(column_values)
if  no_of_values < max_list_length:
for i in range(no_of_values,max_list_length):
dataset[i].append(default_data_value)
my_df = sparkSession.createDataFrame(dataset,schema=column_names)

让我知道这是否适合你。

最新更新