在多个条件下动态连接多个(超过2个)Pyspark数据帧



我有20个数据帧,我想把它组合成一个包含所有列的帧。我的数据帧看起来像

course_id   course_name
5011    Web Designing
5012    Web Development
5013    Programming

subject_id  subject_name    course_name
221 HTML    5011
222 CSS 5011
223 JavaScript  5011
224 PHP 5012
225 Python  5012
226 .Net    5012
227 Java    5013
228 C++ 5013

chapter_id  chapter_name    subject_id
101 HTML Text   221
102 HTML Image  221
103 HTML List   221
104 HTML Link   221
105 HTML Form   221
106 CSS Border  222
107 CSS Position    222
108 CSS Selector    222
109 PHP conditions  224
110 PHP arrays  224
111 Java Methods    227

所以我有50多个数据帧,所以最后一列每次都在变化。

所以我的问题是,我如何才能制作出如下所示的单一结果数据帧,

course_name subject_name    chapter_name    subchapter_name
Web Designing   HTML    HTML Text   HTML Heading
Web Designing   HTML    HTML Text   HTML Paragraph
Web Designing   HTML    HTML List   HTML Ordered List
Web Designing   HTML    HTML List   HTML Unordered List
Web Designing   HTML    HTML Link   HTML Image Link
Web Designing   HTML    HTML Link   HTML Text Link
Web Designing   CSS CSS Border  CSS Border Color
Web Designing   CSS CSS Border  CSS Border Style
Web Designing   CSS CSS Border  CSS Border Width
Web Designing   CSS CSS Position    CSS Absolute Position
Web Designing   CSS CSS Selector    CSS Element Selector
Web Development PHP PHP Conditions  PHP Switch Condition
Web Development PHP PHP Conditions  PHP if Condition
Web Development PHP PHP Arrays  PHP Associative array
Web Development PHP PHP Arrays  PHP Index Array
Programming Java    Java Methods    ava Method Overloading
Programming Java    Java Methods    ava Method Parameter

我目前使用的代码如下,但由于我们想要使用500个报告,我们需要在Pyspark API中进行。

SELECT courses.course_name, 
subjects.subject_name,
chapters.chapter_name, 
subchapters.subchapter_name 
FROM courses
INNER JOIN subjects ON courses.course_id = subjects.course_id
INNER JOIN chapters ON subjects.subject_id = chapters.subject_id
INNER JOIN subchapters ON chapters.chapter_id = subchapters.chapter_id; 

任何关于如何使用pyspark API完成的帮助。我已经尝试过了,但我想每次加入时都改变加入条件

def f(dfs):
df1 = dfs[0]
for df2 in dfs[1:]:
df1 = df1.join(df2, ["id"])
return df1

但是,您需要显式指定join字段。然后可以对该功能进行轻微修改以满足您的需求。

def f(dfs, join_cols):
df = df1
for i in range(len(dfs)):
df = df.join(dfs[i], join_cols[i])
return df

df_ls = [df2, df3]
join_ls = ['course_id', 'subject_id']
df = f(df_ls, join_ls)
df.show()

我现在已经尝试过了,它是如何工作的,但不知道这是否是一个好的答案。

from pyspark.sql import SparkSession
from pyspark.sql import Row
import pyspark.sql.functions as F
spark = SparkSession.builder.appName('abc').getOrCreate()
lst1 = [[1, 2, 3], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
lst2 = [[2, 3, 4], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
lst3 = [[1, 2, 4], ['A', 'B', 'C'], ['aa', 'bb', 'cc']]
R1 = Row("A1", "A2", "A3")
R2 = Row("B1", "B2", "B3")
R3 = Row("C1", "C2", "C3")
df1 = spark.sparkContext.parallelize([R1(*r) for r in zip(*lst1)]).toDF().alias('df1')
df2 = spark.sparkContext.parallelize([R2(*r) for r in zip(*lst2)]).toDF().alias('df2')
df3 = spark.sparkContext.parallelize([R3(*r) for r in zip(*lst3)]).toDF().alias('df3')
list_tup = [(df1, df2, "df1.A1", "df2.B1"),
(df2, df3, "df2.B1", "df3.C1"),
(df1, df3, "df1.A1", "df3.C1")]
df_1 = list_tup[0][0]
for x in list_tup:
df_1 = x[0].join(x[1], on=F.col(x[2]) == F.col(x[3]), how="left_outer")
df_1.show()

+---+---+---+----+----+----+
| A1| A2| A3|  C1|  C2|  C3|
+---+---+---+----+----+----+
|  1|  A| aa|   1|   A|  aa|
|  2|  B| bb|   2|   B|  bb|
|  3|  C| cc|null|null|null|
+---+---+---+----+----+----+

相关内容

  • 没有找到相关文章

最新更新