我有三个pyspark数据框架。我想把数据集引用放在一个字典中,编写一个循环,对这三个数据框执行一些操作,然后保存它们以供进一步分析。但最后一步让我很纠结。我有两个问题:
-
在下面的代码中,我如何访问
TRANSFORMS
中的结果?当我输入:print(TRANSFORMS[0])
我只得到这个神秘的结果:<function __main__.multi_output(Input_table, table_name='ONE')>
我的代码中是否有错误而转换从未进行过? -
我如何修改功能,使它保存三个数据集,如
df_1_result, df_2_result, df_3_result
,我可以在以后的进一步分析中使用?
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Sparky').getOrCreate()
# Create the initial dataframe
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df_when = spark.createDataFrame(data = data, schema = columns)
# Create three identical datasets
df_1 = df_when
df_2 = df_when
df_3 = df_when
TRANSFORMS = []
DATASETS = {
"ONE" : df_1,
"TWO" : df_2,
"THREE" : df_3,
}
for table_name, table_location in list(DATASETS.items()):
def multi_output(Input_table, table_name=table_name):
if table_name=="ONE":
output_table = Input_table.drop("name")
elif table_name=="TWO":
output_table== Input_table.drop("gender")
elif table_name=="THREE":
output_table = Input_table.drop("salary")
return output_table
TRANSFORMS.append(multi_output)
这里有几个问题:
问题1:TRANSFORMS.append(multi_output)只是将函数定义添加到TRANSFORMS列表中。该函数永远不会被调用。同样,我们应该在for循环之外定义它。
问题2:第二个条件下的语句有一个错别字。下面的代码应该按预期工作。
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Sparky').getOrCreate()
# Create the initial dataframe
data = [("James","M",60000),("Michael","M",70000),
("Robert",None,400000),("Maria","F",500000),
("Jen","",None)]
columns = ["name","gender","salary"]
df_when = spark.createDataFrame(data = data, schema = columns)
# Create three identical datasets
df_1 = df_when
df_2 = df_when
df_3 = df_when
TRANSFORMS = []
DATASETS = {
"ONE" : df_1,
"TWO" : df_2,
"THREE" : df_3,
}
def multi_output(Input_table, table_name):
output_table = Input_table
if table_name=="ONE":
output_table = Input_table.drop("name")
elif table_name=="TWO":
output_table= Input_table.drop("gender")
elif table_name=="THREE":
output_table = Input_table.drop("salary")
return output_table
for table_name, table_location in list(DATASETS.items()):
TRANSFORMS.append(multi_output(table_location,table_name))
len(TRANSFORMS)
TRANSFORMS[0].show()
TRANSFORMS[1].show()
TRANSFORMS[2].show()
+------+------+
|gender|salary|
+------+------+
| M| 60000|
| M| 70000|
| null|400000|
| F|500000|
| | null|
+------+------+
+-------+------+
| name|salary|
+-------+------+
| James| 60000|
|Michael| 70000|
| Robert|400000|
| Maria|500000|
| Jen| null|
+-------+------+
+-------+------+
| name|gender|
+-------+------+
| James| M|
|Michael| M|
| Robert| null|
| Maria| F|
| Jen| |
+-------+------+