pyspark中对列的重新分区如何影响分区数



我有一个有一百万条记录的数据帧。它看起来像这个-

df.show()
+--------------------+--------------------++-------------
|            feature1|            feature2| domain    |
+--------------------+--------------------++-------------
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   | 
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain2   |
|[2.23668528E8, 1....|[2.23668528E8, 1....| domain1   |

理想的分区大小在spark中是128MB,让我们假设域列有两个唯一的值(domain1和domain2(,考虑到这一点,我有两个问题-

  1. 如果我执行df.repartition("domain"),并且一个分区无法容纳特定域密钥的所有数据,应用程序会失败吗?还是会根据数据自动创建合适的分区?

  2. 假设在上面的数据中,已经根据域密钥进行了重新分区,因此将有两个分区(唯一的密钥是domain1和domain2(。现在假设domain1和domain2被重复1000000次,我将基于域进行自加入。因此,对于每个领域,我将得到大约10^12记录。考虑到我们有两个分区,并且在连接过程中分区的数量没有变化,这两个新分区是否能够处理1000000条记录?

答案取决于数据的大小。当一个分区不能保存属于一个分区值(例如domain1(的所有数据时,将创建更多的分区,最多spark.sql.shuffle.partitions个。如果您的数据太大,即一个分区将超过2GB的限制(另请参阅为什么Spark RDD分区对HDFS有2GB限制?以了解相关解释(,则重新分区将导致OutOfMemoryError
作为提供完整答案的附带说明:能够将数据放入一个分区并不一定意味着一个分区值只生成一个分区。这取决于执行器的数量以及之前如何对数据进行分区。Spark将尽量避免不必要的混洗,因此可以为一个分区值生成多个分区。

因此,为了防止作业失败,您应该调整spark.sql.shuffle.partitions或将所需的分区数与分区列一起传递给repartition

最新更新