如何在不删除没有新数据的情况下删除分区而在火花中进行分区和编写数据框



我尝试使用DataFrameWriterDataFrame以parquet格式保存到HDF,并由三个列值分区,例如:

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)

如此问题所述,partitionBy将删除path分区的完整层次结构,并用dataFrame中的分区代替它们。由于特定日期的新增量数据将定期出现,因此我想要的是仅替换dataFrame具有数据的层次结构中的这些分区,而其他分区则没有受到影响。

为此
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")

但是,我很难理解将数据组织到单分区DataFrame S中的最佳方法,以便我可以使用他们的完整路径来写出它们。一个想法是:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...

但是foreachPartitionIterator[Row]上运行,这不是写入Parquet格式的理想选择。

我还考虑使用select...distinct eventdate, hour, processtime来获取分区列表,然后通过每个分区过滤原始数据框架,然后将结果保存到其完整的分区路径中。但是,独特的查询和每个分区的过滤器似乎并不是很有效,因为它将是很多过滤/写操作。

我希望有一种更清洁的方法来保存现有分区,dataFrame没有数据?

感谢您的阅读。

火花版本:2.1

这是一个旧主题,但是我遇到了同样的问题并找到了另一个解决方案,只需通过以下方式将您的分区覆盖模式设置为动态:

spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

所以,我的火花会话是这样的:

spark = SparkSession.builder.appName('AppName').getOrCreate()
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

模式选项 Append具有捕获!

df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)

我已经测试过,发现这将保留现有的分区文件。但是,这次的问题是:如果您两次运行相同的代码(使用相同的数据),那么它将创建新的木木quet文件,而不是替换现有数据以获取相同数据(SPARK 1.6)。因此,我们仍然可以使用Overwrite解决此问题,而不是使用Append。我们应该在分区级别上覆盖表级别。

df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)

有关更多信息,请参见以下链接:

Spark DataFrame Write方法中的覆盖特定分区

(我在Suriyanto的评论后更新了我的答复。THNX。)

我知道这很旧。正如我看不到任何解决方案发布的那样,我将继续发布一个。这种方法假设您要写入的目录上有一个蜂巢表。解决此问题的一种方法是从dataFrame创建一个临时视图,该视图应添加到表中,然后使用普通的Hive类似insert overwrite table ...命令:

dataFrame.createOrReplaceTempView("temp_view")
spark.sql("insert overwrite table table_name partition ('eventdate', 'hour', 'processtime')select * from temp_view")

它保留了旧分区,而(超过)仅写入新分区。

相关内容

  • 没有找到相关文章

最新更新