我尝试使用DataFrameWriter
将DataFrame
以parquet格式保存到HDF,并由三个列值分区,例如:
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
如此问题所述,partitionBy
将删除path
分区的完整层次结构,并用dataFrame
中的分区代替它们。由于特定日期的新增量数据将定期出现,因此我想要的是仅替换dataFrame
具有数据的层次结构中的这些分区,而其他分区则没有受到影响。
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
但是,我很难理解将数据组织到单分区DataFrame
S中的最佳方法,以便我可以使用他们的完整路径来写出它们。一个想法是:
dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
但是foreachPartition
在Iterator[Row]
上运行,这不是写入Parquet格式的理想选择。
我还考虑使用select...distinct eventdate, hour, processtime
来获取分区列表,然后通过每个分区过滤原始数据框架,然后将结果保存到其完整的分区路径中。但是,独特的查询和每个分区的过滤器似乎并不是很有效,因为它将是很多过滤/写操作。
我希望有一种更清洁的方法来保存现有分区,dataFrame
没有数据?
感谢您的阅读。
火花版本:2.1
这是一个旧主题,但是我遇到了同样的问题并找到了另一个解决方案,只需通过以下方式将您的分区覆盖模式设置为动态:
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
所以,我的火花会话是这样的:
spark = SparkSession.builder.appName('AppName').getOrCreate()
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
模式选项 Append
具有捕获!
df.write.partitionBy("y","m","d")
.mode(SaveMode.Append)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName)
我已经测试过,发现这将保留现有的分区文件。但是,这次的问题是:如果您两次运行相同的代码(使用相同的数据),那么它将创建新的木木quet文件,而不是替换现有数据以获取相同数据(SPARK 1.6)。因此,我们仍然可以使用Overwrite
解决此问题,而不是使用Append
。我们应该在分区级别上覆盖表级别。
df.write.mode(SaveMode.Overwrite)
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day)
有关更多信息,请参见以下链接:
Spark DataFrame Write方法中的覆盖特定分区
(我在Suriyanto的评论后更新了我的答复。THNX。)
我知道这很旧。正如我看不到任何解决方案发布的那样,我将继续发布一个。这种方法假设您要写入的目录上有一个蜂巢表。解决此问题的一种方法是从dataFrame
创建一个临时视图,该视图应添加到表中,然后使用普通的Hive类似insert overwrite table ...
命令:
dataFrame.createOrReplaceTempView("temp_view")
spark.sql("insert overwrite table table_name partition ('eventdate', 'hour', 'processtime')select * from temp_view")
它保留了旧分区,而(超过)仅写入新分区。