如何将SQL UPDATE查询转换为PySpark(表到数据框架)



我有这个UPDATESQL查询,我需要转换到PySpark与数据框架一起工作。我想知道是否有可能用数据框架来实现它,以及如何实现它。

SQL查询:
UPDATE TBL1
SET COL_C=1
FROM TBL1
INNER JOIN TBL2 ON TBL1.COL_A=TBL2.COL_A AND TBL1.COL_B=TBL2.COL_B
INNER JOIN TBL3 ON TBL2.COL_A=TBL3.COL_A AND TBL2.COL_B=TBL3.COL_B
df_TBL1=TBL1
+-------+--------+----------+------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C|
+-------+--------+----------+------+-----+
|  James|   Smith|1991-04-01|     M| 3000|
|Michael|    Rose|2000-05-19|     M| 4000|
| Robert|Williams|1978-09-05|     M| 4000|
|  Maria|   Jones|1967-12-01|     F| 4000|
|    Jen|   Brown|1980-02-17|     F| 1000|
+-------+--------+----------+------+-----+
df_TBL2=TBL2
+-------+---------+----------+------+-----+
|  COL_A|    COL_B|       dob|gender|COL_C|
+-------+---------+----------+------+-----+
|   John|     Snow|1791-04-01|     M| 9000|
|Michael|     Rose|2000-05-19|     M| 4000|
| Robert|Baratheon|1778-09-05|     M| 9500|
|  Maria|    Jones|1967-12-01|     F| 4000|
+-------+---------+----------+------+-----+
df_TBL3=TBL3 
+--------+------+----------+------+-----+
|   COL_A| COL_B|       dob|gender|COL_C|
+--------+------+----------+------+-----+
| Michael|  Rose|2000-05-19|     M| 4000|
|   Peter|Parker|1978-09-05|     M| 4000|
|   Maria| Jones|1967-12-01|     F| 4000|
|MaryJane| Brown|1980-12-17|     F|10000|
+--------+------+----------+------+-----+

连接给我:

df_TBL_ALL=df_TBL1 
.join(df_TBL2,(df_TBL1.COL_A==df_TBL2.COL_A) & (df_TBL1.COL_B==df_TBL2.COL_B),how="inner") 
.join(df_TBL3,(df_TBL2.COL_A==df_TBL3.COL_A) & (df_TBL2.COL_B==df_TBL3.COL_B),how="inner") 
.select(df_TBL1["*"]) 
.withColumn("COL_C",spf.lit(1))

然后,我试图加入它们

df_TBL1_JOINED=df_TBL1 
.join(df_TBL_ALL,(df_TBL1.COL_A==df_TBL_ALL.COL_A) & (df_TBL1.COL_B==df_TBL_ALL.COL_B),how="left") 
.select(df_TBL1["*"], 
spf.coalesce(df_TBL_ALL.COL_C,df_TBL1.COL_C).alias("COL_C"))
df_TBL1_JOINED.show()
# +-------+--------+----------+------+-----+-----+
# |  COL_A|   COL_B|       dob|gender|COL_C|COL_C|
# +-------+--------+----------+------+-----+-----+
# |  James|   Smith|1991-04-01|     M| 3000| 3000|
# |    Jen|   Brown|1980-02-17|     F| 1000| 1000|
# |  Maria|   Jones|1967-12-01|     F| 4000|    1|
# |Michael|    Rose|2000-05-19|     M| 4000|    1|
# | Robert|Williams|1978-09-05|     M| 4000| 4000|
# +-------+--------+----------+------+-----+-----+

但我不知道如何继续下去。

我:

TBL01_R=TBL01_R 
.drop("COL_C")
TBL01_R=TBL01_R 
.withColumnRenamed("COL_Nova","COL_C").show()
TBL01=TBL01_R
# +-------+--------+----------+------+-----+
# |  COL_A|   COL_B|       dob|gender|COL_C|
# +-------+--------+----------+------+-----+
# |  James|   Smith|1991-04-01|     M| 3000|
# |    Jen|   Brown|1980-02-17|     F| 1000|
# |  Maria|   Jones|1967-12-01|     F|    1|
# |Michael|    Rose|2000-05-19|     M|    1|
# | Robert|Williams|1978-09-05|     M| 4000|
# +-------+--------+----------+------+-----+

我得到了预期的结果,但我不知道这是否是实现它的最佳表现方式。

预期结果:df_tbl1与df_tbl2和df_tbl3的连接中所有行的COL_C更新为1。

df_TBL1:
+-------+--------+----------+------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C|
+-------+--------+----------+------+-----+
|  James|   Smith|1991-04-01|     M| 3000|
|Michael|    Rose|2000-05-19|     M|    1|
| Robert|Williams|1978-09-05|     M| 4000|
|  Maria|   Jones|1967-12-01|     F|    1|
|    Jen|   Brown|1980-02-17|     F| 1000|
+-------+--------+----------+------+-----+

我尝试做一个简洁和高性能的选项。下面只做了2个必要的连接,避免了你在问题中使用的最后一个内部连接。

from pyspark.sql import functions as F
updating = F.forall(F.array('t2', 't3'), lambda x: x)
df_TBL1 = (
df_TBL1.withColumnRenamed('COL_C', 'COL_C_old').alias('T1')
.join(df_TBL2.withColumn('t2', F.lit(True)), ['COL_A', 'COL_B'], 'left')
.join(df_TBL3.withColumn('t3', F.lit(True)), ['COL_A', 'COL_B'], 'left')
.withColumn('updated_c', F.when(updating, 1).otherwise(F.col('COL_C_old')))
.select('T1.*', F.col('updated_c').alias('COL_C'))
.drop('COL_C_old')
)
df_TBL1.show()
# +-------+--------+----------+------+-----+
# |  COL_A|   COL_B|       dob|gender|COL_C|
# +-------+--------+----------+------+-----+
# |  James|   Smith|1991-04-01|     M| 3000|
# |    Jen|   Brown|1980-02-17|     F| 1000|
# |  Maria|   Jones|1967-12-01|     F|    1|
# |Michael|    Rose|2000-05-19|     M|    1|
# | Robert|Williams|1978-09-05|     M| 4000|
# +-------+--------+----------+------+-----+

更新如何工作,逐行

首先,根据COL_A和COL_B将3个表连接在一起。但所有的修改都很小:df_TBL1重命名了COL_C,并添加了别名'T1'(另一个名称更好地访问表);df_TBL2和df_TBL3分别有一个额外的列't2'和't3',总是为True(连接后,它们将表明这些行存在于那些表中)。

.withColumnRenamed('COL_C', 'COL_C_old').alias('T1')
+-------+--------+----------+------+---------+
|  COL_A|   COL_B|       dob|gender|COL_C_old|
+-------+--------+----------+------+---------+
|  James|   Smith|1991-04-01|     M|     3000|
|Michael|    Rose|2000-05-19|     M|     4000|
| Robert|Williams|1978-09-05|     M|     4000|
|  Maria|   Jones|1967-12-01|     F|     4000|
|    Jen|   Brown|1980-02-17|     F|     1000|
+-------+--------+----------+------+---------+
.join(df_TBL2.withColumn('t2', F.lit(True)), ['COL_A', 'COL_B'], 'left')
+-------+--------+----------+------+---------+----------+------+-----+----+
|  COL_A|   COL_B|       dob|gender|COL_C_old|       dob|gender|COL_C|  t2|
+-------+--------+----------+------+---------+----------+------+-----+----+
|  James|   Smith|1991-04-01|     M|     3000|      null|  null| null|null|
|    Jen|   Brown|1980-02-17|     F|     1000|      null|  null| null|null|
|  Maria|   Jones|1967-12-01|     F|     4000|1967-12-01|     F| 4000|true|
|Michael|    Rose|2000-05-19|     M|     4000|2000-05-19|     M| 4000|true|
| Robert|Williams|1978-09-05|     M|     4000|      null|  null| null|null|
+-------+--------+----------+------+---------+----------+------+-----+----+
.join(df_TBL3.withColumn('t3', F.lit(True)), ['COL_A', 'COL_B'], 'left')
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+
|  COL_A|   COL_B|       dob|gender|COL_C_old|       dob|gender|COL_C|  t2|       dob|gender|COL_C|  t3|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+
|  James|   Smith|1991-04-01|     M|     3000|      null|  null| null|null|      null|  null| null|null|
|    Jen|   Brown|1980-02-17|     F|     1000|      null|  null| null|null|      null|  null| null|null|
|  Maria|   Jones|1967-12-01|     F|     4000|1967-12-01|     F| 4000|true|1967-12-01|     F| 4000|true|
|Michael|    Rose|2000-05-19|     M|     4000|2000-05-19|     M| 4000|true|2000-05-19|     M| 4000|true|
| Robert|Williams|1978-09-05|     M|     4000|      null|  null| null|null|      null|  null| null|null|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+

在连接之后,你有withColumn,它创建了一个名为'updated_c'的额外列:如果你在't2'和't3'列中都有True,你在'updated_c'中写入1,否则你从'COL_C_old'中获取值。forall在数组中检查,如果它的所有值符合lambda函数。数组是使用函数array从列't2'和't3'创建的。lambda x: x只是检查值是否为True(如果行存在于df_TBL2或df_TBL3中,值将为True,如果不存在,它们将为null -我们只需要True)

updating = F.forall(F.array('t2', 't3'), lambda x: x)
.withColumn('updated_c', F.when(updating, 1).otherwise(F.col('COL_C_old')))
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+---------+
|  COL_A|   COL_B|       dob|gender|COL_C_old|       dob|gender|COL_C|  t2|       dob|gender|COL_C|  t3|updated_c|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+---------+
|  James|   Smith|1991-04-01|     M|     3000|      null|  null| null|null|      null|  null| null|null|     3000|
|    Jen|   Brown|1980-02-17|     F|     1000|      null|  null| null|null|      null|  null| null|null|     1000|
|  Maria|   Jones|1967-12-01|     F|     4000|1967-12-01|     F| 4000|true|1967-12-01|     F| 4000|true|        1|
|Michael|    Rose|2000-05-19|     M|     4000|2000-05-19|     M| 4000|true|2000-05-19|     M| 4000|true|        1|
| Robert|Williams|1978-09-05|     M|     4000|      null|  null| null|null|      null|  null| null|null|     4000|
+-------+--------+----------+------+---------+----------+------+-----+----+----------+------+-----+----+---------+

然后使用别名'T1.*'(它也需要'COL_C_old')selectdf_TBL1中的所有列,并添加另一个列:'updated_c',将其重命名为'COL_C'。

.select('T1.*', F.col('updated_c').alias('COL_C'))
+-------+--------+----------+------+---------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C_old|COL_C|
+-------+--------+----------+------+---------+-----+
|  James|   Smith|1991-04-01|     M|     3000| 3000|
|    Jen|   Brown|1980-02-17|     F|     1000| 1000|
|  Maria|   Jones|1967-12-01|     F|     4000|    1|
|Michael|    Rose|2000-05-19|     M|     4000|    1|
| Robert|Williams|1978-09-05|     M|     4000| 4000|
+-------+--------+----------+------+---------+-----+

最后,drop不必要的'COL_C_old'。

.drop('COL_C_old')
+-------+--------+----------+------+-----+
|  COL_A|   COL_B|       dob|gender|COL_C|
+-------+--------+----------+------+-----+
|  James|   Smith|1991-04-01|     M| 3000|
|    Jen|   Brown|1980-02-17|     F| 1000|
|  Maria|   Jones|1967-12-01|     F|    1|
|Michael|    Rose|2000-05-19|     M|    1|
| Robert|Williams|1978-09-05|     M| 4000|
+-------+--------+----------+------+-----+

你可以很容易地在spark sql中使用temptable和dataframe。Temp表也是spark-sql的实体,因此它与dataframe

相同。
>>> TBL1.show()
+-----+--------+--------+
|COL_A|   COL_B|   COL_C|
+-----+--------+--------+
|    A|       B|  scjsdk|
|   A1|cnddssac|saacjsdk|
|    A|   cndds|  scjsdk|
+-----+--------+--------+
>>> TBL2.show()
+-----+-----------+----------+
|COL_A|      COL_B|     COL_C|
+-----+-----------+----------+
|    A|          B|   scjksdk|
|   A1|cndmmkdssac|sbkaacjsdk|
|    A|      cndds| scjjbjsdk|
+-----+-----------+----------+
>>> TBL3.show()
+-----+-----------+------------+
|COL_A|      COL_B|       COL_C|
+-----+-----------+------------+
|    A|          B|  scjcjbksdk|
|   A1|cndmmkdssac|sbkadaacjsdk|
|    A|      cndds|scjjdwfbjsdk|
+-----+-----------+------------+
>>> TBL1.registerTempTable("TBL1")
>>> TBL2.registerTempTable("TBL2")
>>> TBL3.registerTempTable("TBL3")
>>> final_df=spark.sql("select * FROM TBL1 INNER JOIN TBL2 ON TBL1.COL_A=TBL2.COL_A AND TBL1.COL_B=TBL2.COL_B INNER JOIN TBL3 ON TBL2.COL_A=TBL3.COL_A AND TBL2.COL_B=TBL3.COL_B")
>>> final_df.show()
+-----+-----+------+-----+-----+---------+-----+-----+------------+
|COL_A|COL_B| COL_C|COL_A|COL_B|    COL_C|COL_A|COL_B|       COL_C|
+-----+-----+------+-----+-----+---------+-----+-----+------------+
|    A|    B|scjsdk|    A|    B|  scjksdk|    A|    B|  scjcjbksdk|
|    A|cndds|scjsdk|    A|cndds|scjjbjsdk|    A|cndds|scjjdwfbjsdk|
+-----+-----+------+-----+-----+---------+-----+-----+------------+

*************** 更新COL_C *******************

>>> final_df.withColumn("COL_C",lit(1)).show()
+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|COL_A|COL_B|COL_C|COL_A|COL_B|COL_C|COL_A|COL_B|COL_C|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+
|    A|    B|    1|    A|    B|    1|    A|    B|    1|
|    A|cndds|    1|    A|cndds|    1|    A|cndds|    1|
+-----+-----+-----+-----+-----+-----+-----+-----+-----+

= = = = = = =如果你想只更新TBL1 COL_C价值 =====================

>>> final_df=spark.sql("select TBL1.COL_A,TBL1.COL_B,TBL1.COL_C FROM TBL1 INNER JOIN TBL2 ON TBL1.COL_A=TBL2.COL_A AND TBL1.COL_B=TBL2.COL_B INNER JOIN TBL3 ON TBL2.COL
>>> final_df.show()
+-----+-----+------+
|COL_A|COL_B| COL_C|
+-----+-----+------+
|    A|    B|scjsdk|
|    A|cndds|scjsdk|
+-----+-----+------+

************ 更新COL_C *****************

>>> final_df.withColumn("COL_C",lit(1)).show()
+-----+-----+-----+
|COL_A|COL_B|COL_C|
+-----+-----+-----+
|    A|    B|    1|
|    A|cndds|    1|
+-----+-----+-----+

最新更新