将结果保存为pyspark多输出循环中的数据帧



我有三个pyspark数据框架。我想把数据集引用放在一个字典中,编写一个循环,对这三个数据框执行一些操作,然后保存它们以供进一步分析。但最后一步让我很纠结。我有两个问题:

  1. 在下面的代码中,我如何访问TRANSFORMS中的结果?当我输入:print(TRANSFORMS[0])我只得到这个神秘的结果:<function __main__.multi_output(Input_table, table_name='ONE')>我的代码中是否有错误而转换从未进行过?

  2. 我如何修改功能,使它保存三个数据集,如df_1_result, df_2_result, df_3_result,我可以在以后的进一步分析中使用?


import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Sparky').getOrCreate()

# Create the initial dataframe
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df_when = spark.createDataFrame(data = data, schema = columns)
# Create three identical datasets
df_1 = df_when
df_2 = df_when
df_3 = df_when

TRANSFORMS = []
DATASETS = {
"ONE"   : df_1,
"TWO"   : df_2,
"THREE" : df_3,
}
for table_name, table_location in list(DATASETS.items()):
def multi_output(Input_table, table_name=table_name):
if table_name=="ONE":
output_table = Input_table.drop("name")

elif table_name=="TWO":
output_table== Input_table.drop("gender")

elif table_name=="THREE":
output_table = Input_table.drop("salary")

return output_table

TRANSFORMS.append(multi_output)

这里有几个问题:

问题1:TRANSFORMS.append(multi_output)只是将函数定义添加到TRANSFORMS列表中。该函数永远不会被调用。同样,我们应该在for循环之外定义它。

问题2:第二个条件下的语句有一个错别字。下面的代码应该按预期工作。

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Sparky').getOrCreate()

# Create the initial dataframe
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df_when = spark.createDataFrame(data = data, schema = columns)
# Create three identical datasets
df_1 = df_when
df_2 = df_when
df_3 = df_when

TRANSFORMS = []
DATASETS = {
"ONE"   : df_1,
"TWO"   : df_2,
"THREE" : df_3,
}
def multi_output(Input_table, table_name):
output_table = Input_table
if table_name=="ONE":
output_table = Input_table.drop("name")

elif table_name=="TWO":
output_table= Input_table.drop("gender")

elif table_name=="THREE":
output_table = Input_table.drop("salary")

return output_table

for table_name, table_location in list(DATASETS.items()):
TRANSFORMS.append(multi_output(table_location,table_name))

len(TRANSFORMS)
TRANSFORMS[0].show()
TRANSFORMS[1].show()
TRANSFORMS[2].show()
+------+------+
|gender|salary|
+------+------+
|     M| 60000|
|     M| 70000|
|  null|400000|
|     F|500000|
|      |  null|
+------+------+
+-------+------+
|   name|salary|
+-------+------+
|  James| 60000|
|Michael| 70000|
| Robert|400000|
|  Maria|500000|
|    Jen|  null|
+-------+------+
+-------+------+
|   name|gender|
+-------+------+
|  James|     M|
|Michael|     M|
| Robert|  null|
|  Maria|     F|
|    Jen|      |
+-------+------+

相关内容

  • 没有找到相关文章

最新更新