我有一个spark数据框架,我试图在hive中创建一个分区表。
我有一个标志来说明表是否存在。第一次运行应该创建表,从第二次运行开始,应该在不覆盖现有数据的情况下将数据插入表中。
我的问题是如何创建一个分区表并插入到已经存在的分区表中,而不覆盖现有的数据。
表由一个名为date的列划分。
到目前为止我所尝试的。(没有分区)
df.createOrReplaceTempView("df_view")
if table_exists:
spark.sql("insert into mytable select * from df_view")
else:
spark.sql("create table if not exists mytable as select * from df_view")
但是我必须对分区列-日期做同样的操作。
同一日期可以有多个运行。那么是否可以将数据附加到同一分区而不是覆盖它呢?
预期输出:在第一次运行之后,应该创建一个表,其中分区列作为日期。
Name date timestamp
A. 2021-09-16 2021-09-16 12:00:01
B. 2021-09-16 2021-09-16 12:00:01
在相同日期第二次运行后:(数据应该被附加到相同的分区)
Name date timestamp
A. 2021-09-16 2021-09-16 12:00:01
B. 2021-09-16 2021-09-16 12:00:01
A. 2021-09-16 2021-09-16 12:20:01
B. 2021-09-16 2021-09-16 12:20:01
下次运行第三次:(保留所有现有数据创建新分区)
Name date timestamp
A. 2021-09-16 2021-09-16 12:00:01
B. 2021-09-16 2021-09-16 12:00:01
A. 2021-09-16 2021-09-16 12:20:01
B. 2021-09-16 2021-09-16 12:20:01
A. 2021-09-17 2021-09-17 12:20:01
B. 2021-09-17 2021-09-17 12:20:01
如何在Pyspark中实现
按照文档编写的代码可能是这样的:
df.write.saveAsTable('[table_name_here]',
format='[format_here]',
mode='append',
partitionBy='date')
这段代码不需要检查表是否存在,如果不存在,append
会自动创建。
您可以运行parts = spark.sql('show partitions mytable')
,甚至将其转换为Python列表或Pandas并检查分区是否存在