我在Hive上有一个表,我正在尝试在该表中插入数据。
我正在从SQL中获取数据,但我不想插入Hive表中已经存在的id。我正在尝试使用相同的条件,就像不存在的地方一样。我在Airflow上使用PySpark。
Spark 中不存在 exists
运算符,但有 2 个连接运算符可以替换它:left_anti
和 left_semi
。
例如,如果要在配置单元表中插入数据帧df
target
,则可以执行以下操作:
new_df = df.join(
spark.table("target"),
how='left_anti',
on='id'
)
然后你在表格中写new_df
。
left_anti
允许您仅保留不满足连接条件的行(相当于 not exists
(。相当于exists
是left_semi
。
您可以通过临时视图直接使用 spark SQL 在数据帧上使用not exist
:
table_withNull_df.createOrReplaceTempView("table_withNull")
tblA_NoNull_df.createOrReplaceTempView("tblA_NoNull")
result_df = spark.sql("""
select * from table_withNull
where not exists
(select 1 from
tblA_NoNull
where table_withNull.id = tblA_NoNull.id)
""")
此方法可以优先于左反联接,因为它们可能导致意外的 BroadcastNestedLoopJoin,从而导致广播超时(即使没有在反联接中显式请求广播(。
之后,您可以执行write.mode("append")
以插入以前未遇到的数据。
从这里取的例子
恕我直言,我认为 Spark 中不存在这样的属性。我认为您可以使用两种方法:
-
UNIQUE
条件(关系数据库的典型特征(的解决方法:这样,当您尝试插入(append
模式下(已经存在的记录时,您将获得可以正确处理的异常。 -
读取要写入的表,使用要添加到上述表中的数据
outer join
它,然后将结果写入overwrite mode
(但我认为第一个解决方案的性能可能更好(。
欲了解更多详情,请随时询问