通过pyspark dataframe创建hive管理分区表,每次运行追加数据



我有一个spark数据框架,我试图在hive中创建一个分区表。

我有一个标志来说明表是否存在。第一次运行应该创建表,从第二次运行开始,应该在不覆盖现有数据的情况下将数据插入表中。

我的问题是如何创建一个分区表并插入到已经存在的分区表中,而不覆盖现有的数据。

表由一个名为date的列划分。

到目前为止我所尝试的。(没有分区)

df.createOrReplaceTempView("df_view")
if table_exists:
spark.sql("insert into mytable select * from df_view")
else:
spark.sql("create table if not exists mytable as select * from df_view")

但是我必须对分区列-日期做同样的操作。

同一日期可以有多个运行。那么是否可以将数据附加到同一分区而不是覆盖它呢?

预期输出:在第一次运行之后,应该创建一个表,其中分区列作为日期。

Name date        timestamp
A.   2021-09-16  2021-09-16 12:00:01
B.   2021-09-16  2021-09-16 12:00:01

在相同日期第二次运行后:(数据应该被附加到相同的分区)

Name date        timestamp
A.   2021-09-16  2021-09-16 12:00:01
B.   2021-09-16  2021-09-16 12:00:01
A.   2021-09-16  2021-09-16 12:20:01
B.   2021-09-16  2021-09-16 12:20:01

下次运行第三次:(保留所有现有数据创建新分区)

Name date        timestamp
A.   2021-09-16  2021-09-16 12:00:01
B.   2021-09-16  2021-09-16 12:00:01
A.   2021-09-16  2021-09-16 12:20:01
B.   2021-09-16  2021-09-16 12:20:01
A.   2021-09-17  2021-09-17 12:20:01
B.   2021-09-17  2021-09-17 12:20:01

如何在Pyspark中实现

按照文档编写的代码可能是这样的:

df.write.saveAsTable('[table_name_here]', 
format='[format_here]', 
mode='append', 
partitionBy='date')

这段代码不需要检查表是否存在,如果不存在,append会自动创建。

您可以运行parts = spark.sql('show partitions mytable'),甚至将其转换为Python列表或Pandas并检查分区是否存在

相关内容

最新更新