如何在 pyspark 中使用"where not exists" SQL 条件?



我在Hive上有一个表,我正在尝试在该表中插入数据。
我正在从SQL中获取数据,但我不想插入Hive表中已经存在的id。我正在尝试使用相同的条件,就像不存在的地方一样。我在Airflow上使用PySpark

Spark 中不存在 exists 运算符,但有 2 个连接运算符可以替换它:left_antileft_semi

例如,如果要在配置单元表中插入数据帧df target,则可以执行以下操作:

new_df = df.join(
    spark.table("target"),
    how='left_anti',
    on='id'
)

然后你在表格中写new_df

left_anti允许您仅保留不满足连接条件的行(相当于 not exists (。相当于existsleft_semi

您可以通过临时视图直接使用 spark SQL 在数据帧上使用not exist

table_withNull_df.createOrReplaceTempView("table_withNull")
tblA_NoNull_df.createOrReplaceTempView("tblA_NoNull")
result_df = spark.sql("""
select * from table_withNull 
where not exists 
(select 1 from 
tblA_NoNull 
where table_withNull.id = tblA_NoNull.id)
""")

此方法可以优先于左反联接,因为它们可能导致意外的 BroadcastNestedLoopJoin,从而导致广播超时(即使没有在反联接中显式请求广播(。

之后,您可以执行write.mode("append")以插入以前未遇到的数据。

从这里取的例子

恕我直言,我认为 Spark 中不存在这样的属性。我认为您可以使用两种方法:

  1. UNIQUE条件(关系数据库的典型特征(的解决方法:这样,当您尝试插入(append模式下(已经存在的记录时,您将获得可以正确处理的异常。

  2. 读取要写入的表,使用要添加到上述表中的数据outer join它,然后将结果写入overwrite mode(但我认为第一个解决方案的性能可能更好(。

欲了解更多详情,请随时询问

相关内容

  • 没有找到相关文章

最新更新