插入如果不存在,则在Spark SQL中更新



在Spark SQL中是否有任何"INSERT IF NOT EXISTELSE UPDATE"的规定。

我有一些记录的Spark SQL表"ABC"。 然后我有另一批记录,我想根据它们是否存在在此表中在此表中插入/更新。

是否有可以在 SQL 查询中使用的 SQL 命令来实现此目的?

在常规 Spark 中,这可以通过一个join后跟如下map来实现:

import spark.implicits._
val df1 = spark.sparkContext.parallelize(List(("id1", "orginal"), ("id2", "original"))).toDF("df1_id", "df1_status")
val df2 = spark.sparkContext.parallelize(List(("id1", "new"), ("id3","new"))).toDF("df2_id", "df2_status")
val df3 = df1
.join(df2, 'df1_id === 'df2_id, "outer")
.map(row => {
if (row.isNullAt(2))
(row.getString(0), row.getString(1))
else
(row.getString(2), row.getString(3))
})

这会产生:

scala> df3.show
+---+--------+
| _1|      _2|
+---+--------+
|id3|     new| 
|id1|     new|
|id2|original|
+---+--------+

您也可以将selectudfs一起使用而不是map,但在这种特殊情况下使用空值,我个人更喜欢map变体。

你可以像这样使用Spark SQL:

select * from (select c.*, row_number() over (partition by tac  order by tag desc) as 
TAG_NUM from (
select 
a.tac
,a.name
,0 as tag
from tableA a
union all
select 
b.tac
,b.name
,1 as tag
from tableB b) c ) d where TAG_NUM=1

tac 是您要插入/更新的列。

我知道现在分享我的代码有点晚了,但是要添加或更新我的数据库,我做了一个看起来像这样的功能:

import pandas as pd
#Returns a spark dataframe with added and updated datas
#key parameter is the primary key of the dataframes
#The two parameters dfToUpdate and dfToAddAndUpdate are spark dataframes
def AddOrUpdateDf(dfToUpdate,dfToAddAndUpdate,key):
#Cast the spark dataframe dfToUpdate to pandas dataframe
dfToUpdatePandas = dfToUpdate.toPandas()
#Cast the spark dataframe dfToAddAndUpdate to pandas dataframe
dfToAddAndUpdatePandas = dfToAddAndUpdate.toPandas()
#Update the table records with the latest records, and adding new records if there are new records.
AddOrUpdatePandasDf = pd.concat([dfToUpdatePandas,dfToAddAndUpdatePandas]).drop_duplicates([key], keep = 'last').sort_values(key)
#Cast back to get a spark dataframe
AddOrUpdateDf = spark.createDataFrame(AddOrUpdatePandasDf)
return AddOrUpdateDf

如您所见,我们需要将 Spark 数据帧转换为 pandas 数据帧,以便能够执行 pd.concat,尤其是带有"keep = 'last'"的drop_duplicates,然后我们转换回 Spark 数据帧并返回它。 我不认为这是处理AddOrUpdate的最佳方式,但至少,它可以工作。

最新更新