插入已排序冰山表时避免洗牌



我用

创建了一个冰山表
CREATE TABLE catalog.db.table (a int, b int) USING iceberg

然后对它进行排序

ALTER TABLE catalog.db.table WRITE ORDERED BY (a, b)

调用最后一条命令后,SHOW TBLPROPERTIES catalog.db.table开始显示write.distribution-mode: range属性:

|sort-order             |a ASC NULLS FIRST, b ASC NULLS FIRST|
|write.distribution-mode|range                               |

现在我正在向表中写入数据:

df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()

我认为这应该在spark中创建一个单一的任务,它将对数据框中的所有数据进行排序(实际上它自创建以来就进行了排序),然后将其作为单个文件插入表中。

不幸的是,在写入的那一刻,spark决定重新分区所有数据,这导致了洗牌。我认为这是由于write.distribution-mode: range,它是自动设置的。

== Physical Plan ==
AppendData (6)
+- * Sort (5)
+- Exchange (4)    # :(
+- * Project (3)
+- Coalesce (2)
+- * Scan ExistingRDD (1)

是否有一种方法可以插入新数据,但也避免不必要的洗牌?

根据Apache Iceberg文档,WRITE ORDERED BY执行以下操作:

冰山表可以配置排序顺序,用于在某些引擎中自动对写入表的数据进行排序。例如,Spark中的MERGE INTO将使用表排序。

现在,用下面的代码创建和编写表:

df = spark.createDataFrame([(i, i*4) for i in range(100000)], ["a", "b"]).coalesce(1).sortWithinPartitions("a", "b")
df.writeTo("datalakelocal.ixanezis.table").append()

排序数据帧需要shuffle操作。您已经使用了sortWithinPartitions,它可以对数据进行排序,但仅在分区内。因此,这并不能完成冰山表所要求的完整排序操作。

因此,您需要另一个完整的shuffle操作来完成您的完整排序。

最新更新