在pySpark日期框中创建CreateOrReplaceTempView的另一种方法



我使用 spark.sql 创建了一个连接条件,并在创建的特定数据帧上创建了一个临时视图。但是,我得到了建筑师的评论,不要创建临时视图,而是使用数据帧本身。那么,我如何以另一种编码方式实现以下目标:

df1 = spark.sql(" select a.col1, a.col2, b.col1, b.col2,
"rank() over(partition by b.bkeyid order by load_time desc) as rank "
"from table1 a inner join table2 b "
"on a.bkeyid = b.bkeyid")
df2 = df1.where(df1.rank == lit(1))   # Using rank to get most current records from the table b
df2.createOrReplaceTempView("new_table")

从new_table开始,我必须将其与另一个表3连接起来,例如:

df3 = spark.sql(" select a.col1, a.col2, b.col1, b.col2,
"rank() over(partition by b.bkeyid order by load_time desc) as rank "
"from new_table a inner join table3 b "
"on a.bkeyid = b.bkeyid")
df4 = df3.where(df1.rank == lit(1))
df4.createOrReplaceTempView("new_table2")

我必须使用new_table2使用提供的映射逻辑创建目标表。 问题是我如何以另一种格式实现上述SQL编码。请帮我更新我的编码部分?? 谢谢

我想你想使用数据帧API来调用相同而不是SQL查询。 由于无法为您编写确切的代码,但我提供了有问题的查询的第二部分,或者如果需要,您可以以类似的方式更改第一个查询

from pyspark.sql.window import Window
import pyspark.sql.functions as f
df1 = spark.sql(" select * from (select a.col1, a.col2, b.col1, b.col2,
rank() over(partition by b.bkeyid order by load_time desc) as rnk 
from table1 a inner join table2 b 
on a.bkeyid = b.bkeyid ) c where c.rnk=1")
table3_df =  spark.sql(select * from table3)

new_df2=df1.join(table3_df,df1.bkeyid=table3.bkeyid).select('col1','col2','col3','col4').withColumn("rank", rank().over(Window.partitionBy(df1['bkeyid']).orderBy(df1['load_time'].desc())))
new_df3=filter(new_df2['rank']==1).select('col1','col2','col3','col4').show()

最新更新