我用
创建了一个冰山表CREATE TABLE catalog.db.table (a int, b int) USING iceberg
然后对它进行排序
ALTER TABLE catalog.db.table WRITE ORDERED BY (a, b)
调用最后一条命令后,SHOW TBLPROPERTIES catalog.db.table
开始显示write.distribution-mode: range
属性:
|sort-order |a ASC NULLS FIRST, b ASC NULLS FIRST|
|write.distribution-mode|range |
现在我正在向表中写入数据:
df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()
我认为这应该在spark中创建一个单一的任务,它将对数据框中的所有数据进行排序(实际上它自创建以来就进行了排序),然后将其作为单个文件插入表中。
不幸的是,在写入的那一刻,spark决定重新分区所有数据,这导致了洗牌。我认为这是由于write.distribution-mode: range
,它是自动设置的。
== Physical Plan ==
AppendData (6)
+- * Sort (5)
+- Exchange (4) # :(
+- * Project (3)
+- Coalesce (2)
+- * Scan ExistingRDD (1)
是否有一种方法可以插入新数据,但也避免不必要的洗牌?
根据Apache Iceberg文档,WRITE ORDERED BY
执行以下操作:
冰山表可以配置排序顺序,用于在某些引擎中自动对写入表的数据进行排序。例如,Spark中的MERGE INTO将使用表排序。
现在,用下面的代码创建和编写表:
df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()
排序数据帧需要shuffle操作。您已经使用了sortWithinPartitions
,它可以对数据进行排序,但仅在分区内。因此,这并不能完成冰山表所要求的完整排序操作。
因此,您需要另一个完整的shuffle操作来完成您的完整排序。