有什么方法可以获得DataFrame的当前分区数吗?我检查了DataFrame javadoc(spark 1.6),但没有找到一个方法,或者我只是错过了它?(在JavaRDD的情况下,有一个getNumPartitions()方法。)
您需要在DataFrame的底层RDD上调用getNumPartitions()
,例如df.rdd.getNumPartitions()
。在Scala的情况下,这是一个无参数的方法:df.rdd.getNumPartitions
。
dataframe.rdd.partitions.size
是除df.rdd.getNumPartitions()
或df.rdd.length
之外的另一种选择。
让我用一个完整的例子来解释。。。
val x = (1 to 10).toList
val numberDF = x.toDF(“number”)
numberDF.rdd.partitions.size // => 4
为了证明我们在上面得到了多少个分区。。。将该数据帧保存为csv
numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)
以下是如何在不同的分区上分离数据。
Partition 00000: 1, 2
Partition 00001: 3, 4, 5
Partition 00002: 6, 7
Partition 00003: 8, 9, 10
更新:
@Hemanth在评论中问了一个很好的问题。。。基本上为什么数字在上述情况下,分区数为4
简短回答:取决于执行的情况。由于我使用了local[4],我得到了4个分区。
长答案:
我在本地机器上运行了上面的程序,并使用master作为本地[4],基于它作为4分区。
val spark = SparkSession.builder()
.appName(this.getClass.getName)
.config("spark.master", "local[4]").getOrCreate()
如果它在主纱线中的火花壳,我得到的分区数为2
示例:spark-shell --master yarn
并再次键入相同的命令
scala> val x = (1 to 10).toList
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val numberDF = x.toDF("number")
numberDF: org.apache.spark.sql.DataFrame = [number: int]
scala> numberDF.rdd.partitions.size
res0: Int = 2
- 这里2是默认的火花谈判
- 基于hashpartitioner,spark将决定分配多少个分区。如果您在
--master local
中运行并且基于您的Runtime.getRuntime.availableProcessors()
即它将尝试分配的CCD_ 10这些数量的分区。如果您的可用处理器数量是12(即local[Runtime.getRuntime.availableProcessors()])
,并且您有1到10的列表,那么将只创建10个分区
注意:
如果你在一台12核笔记本电脑上,我正在执行spark程序,默认情况下分区/任务的数量是所有可用内核的数量,即12个。那个表示
local[*]
或s"local[${Runtime.getRuntime.availableProcessors()}]")
,但在此如果只有10个数字,则限制为10
记住所有这些指针,我建议你尝试自己的
转换为RDD,然后获得分区长度
DF.rdd.partitions.length
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
df.rdd.getNumPartitions
获取分区数量的一种更有趣的方法是"使用mapPartitions"转换。示例代码-
val x = (1 to 10).toList
val numberDF = x.toDF()
numberDF.rdd.mapPartitions(x => Iterator[Int](1)).sum()
欢迎星火专家对其性能发表评论。