从config - PySpark动态构建数据框架名称



我需要根据配置动态构建最终的数据框架名称(连接final_df和后缀)。当我运行最后提到的代码时,我得到了错误- &;SyntaxError:不能分配给operator&;。但是,如果我用任何其他名称替换每个["final_df"]+'_'+每个["suffix"],它就可以工作了。

数据:

df_source_1 = spark.createDataFrame(
[
(123,10),
(123,15),
(123,20)
],
("cust_id", "value")
)
配置:

config = """
[ 
{
"source_df":"df_source_1",
"suffix": "new", 
"group":["cust_id"],
"final_df": "df_taregt_1"
}
]
"""   

代码:

import json   
for each in json.loads(config):
print("Before=",each['final_df'] ) # str object
print(each["final_df"]+'_'+ each["suffix"]) # df_taregt_1_new , print statement works
each["final_df"]+'_'+ each["suffix"] = eval(each["source_df"]).groupBy(each["group"]).agg(sum("value")) # Errors out. Here I need to assign the dataframe to df_taregt_1_new

有谁能帮忙吗?

您使用字典编写代码:

df_dict = {}
df_dict["df_source_1"] = spark.createDataFrame(
[(123, 10), (123, 15), (123, 20)], ("cust_id", "value")
)
for each in json.loads(config):
df_dict[each["final_df"] + "_" + each["suffix"]] = (
df_dict[each["source_df"]].groupBy(each["group"]).agg(sum("value"))
)

不是处理那些应该是动态创建的对象,而是使用一个字典来存储所有这些对象及其动态名称。你甚至可以测试你的字典来知道一个对象是否存在。

最新更新