我有20个数据帧,我想把它组合成一个包含所有列的帧。我的数据帧看起来像
course_id course_name
5011 Web Designing
5012 Web Development
5013 Programming
subject_id subject_name course_name
221 HTML 5011
222 CSS 5011
223 JavaScript 5011
224 PHP 5012
225 Python 5012
226 .Net 5012
227 Java 5013
228 C++ 5013
chapter_id chapter_name subject_id
101 HTML Text 221
102 HTML Image 221
103 HTML List 221
104 HTML Link 221
105 HTML Form 221
106 CSS Border 222
107 CSS Position 222
108 CSS Selector 222
109 PHP conditions 224
110 PHP arrays 224
111 Java Methods 227
所以我有50多个数据帧,所以最后一列每次都在变化。
所以我的问题是,我如何才能制作出如下所示的单一结果数据帧,
course_name subject_name chapter_name subchapter_name
Web Designing HTML HTML Text HTML Heading
Web Designing HTML HTML Text HTML Paragraph
Web Designing HTML HTML List HTML Ordered List
Web Designing HTML HTML List HTML Unordered List
Web Designing HTML HTML Link HTML Image Link
Web Designing HTML HTML Link HTML Text Link
Web Designing CSS CSS Border CSS Border Color
Web Designing CSS CSS Border CSS Border Style
Web Designing CSS CSS Border CSS Border Width
Web Designing CSS CSS Position CSS Absolute Position
Web Designing CSS CSS Selector CSS Element Selector
Web Development PHP PHP Conditions PHP Switch Condition
Web Development PHP PHP Conditions PHP if Condition
Web Development PHP PHP Arrays PHP Associative array
Web Development PHP PHP Arrays PHP Index Array
Programming Java Java Methods ava Method Overloading
Programming Java Java Methods ava Method Parameter
我目前使用的代码如下,但由于我们想要使用500个报告,我们需要在Pyspark API中进行。
SELECT courses.course_name,
subjects.subject_name,
chapters.chapter_name,
subchapters.subchapter_name
FROM courses
INNER JOIN subjects ON courses.course_id = subjects.course_id
INNER JOIN chapters ON subjects.subject_id = chapters.subject_id
INNER JOIN subchapters ON chapters.chapter_id = subchapters.chapter_id;
任何关于如何使用pyspark API完成的帮助。我已经尝试过了,但我想每次加入时都改变加入条件
def f(dfs):
df1 = dfs[0]
for df2 in dfs[1:]:
df1 = df1.join(df2, ["id"])
return df1
但是,您需要显式指定join
字段。然后可以对该功能进行轻微修改以满足您的需求。
def f(dfs, join_cols):
df = df1
for i in range(len(dfs)):
df = df.join(dfs[i], join_cols[i])
return df
df_ls = [df2, df3]
join_ls = ['course_id', 'subject_id']
df = f(df_ls, join_ls)
df.show()
我现在已经尝试过了,它是如何工作的,但不知道这是否是一个好的答案。
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('abc').getOrCreate()
lst1 = [[1, 2, 3], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
lst2 = [[2, 3, 4], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
lst3 = [[1, 2, 4], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
R1 = Row("A1", "A2", "A3")
R2 = Row("B1", "B2", "B3")
R3 = Row("C1", "C2", "C3")
df1 = spark.sparkContext.parallelize([R1(*r) for r in zip(*lst1)]).toDF().alias('df1')
df2 = spark.sparkContext.parallelize([R2(*r) for r in zip(*lst2)]).toDF().alias('df2')
df3 = spark.sparkContext.parallelize([R3(*r) for r in zip(*lst3)]).toDF().alias('df3')
list_tup = [(df1, df2, "df1.A1", "df2.B1"),
(df2, df3, "df2.B1", "df3.C1"),
(df1, df3, "df1.A1", "df3.C1")]
df_1 = list_tup[0][0]
for x in list_tup:
df_1 = x[0].join(x[1], on=F.col(x[2]) == F.col(x[3]), how="left_outer")
df_1.show()
+---+---+---+----+----+----+
| A1| A2| A3| C1| C2| C3|
+---+---+---+----+----+----+
| 1| A| aa| 1| A| aa|
| 2| B| bb| 2| B| bb|
| 3| C| cc|null|null|null|
+---+---+---+----+----+----+