获取DataFrame的当前分区数



有什么方法可以获得DataFrame的当前分区数吗?我检查了DataFrame javadoc(spark 1.6),但没有找到一个方法,或者我只是错过了它?(在JavaRDD的情况下,有一个getNumPartitions()方法。)

您需要在DataFrame的底层RDD上调用getNumPartitions(),例如df.rdd.getNumPartitions()。在Scala的情况下,这是一个无参数的方法:df.rdd.getNumPartitions

dataframe.rdd.partitions.size是除df.rdd.getNumPartitions()df.rdd.length之外的另一种选择。

让我用一个完整的例子来解释。。。

val x = (1 to 10).toList
val numberDF = x.toDF(“number”)
numberDF.rdd.partitions.size // => 4

为了证明我们在上面得到了多少个分区。。。将该数据帧保存为csv

numberDF.write.csv(“/Users/Ram.Ghadiyaram/output/numbers”)

以下是如何在不同的分区上分离数据。

Partition 00000: 1, 2
Partition 00001: 3, 4, 5
Partition 00002: 6, 7
Partition 00003: 8, 9, 10

更新:

@Hemanth在评论中问了一个很好的问题。。。基本上为什么数字在上述情况下,分区数为4

简短回答:取决于执行的情况。由于我使用了local[4],我得到了4个分区。

长答案:

我在本地机器上运行了上面的程序,并使用master作为本地[4],基于它作为4分区。

val spark = SparkSession.builder()
.appName(this.getClass.getName)
.config("spark.master", "local[4]").getOrCreate()

如果它在主纱线中的火花壳,我得到的分区数为2

示例:spark-shell --master yarn并再次键入相同的命令

scala> val x = (1 to 10).toList
x: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

scala> val numberDF = x.toDF("number")
numberDF: org.apache.spark.sql.DataFrame = [number: int]
scala> numberDF.rdd.partitions.size
res0: Int = 2
  • 这里2是默认的火花谈判
  • 基于hashpartitioner,spark将决定分配多少个分区。如果您在--master local中运行并且基于您的Runtime.getRuntime.availableProcessors()即它将尝试分配的CCD_ 10这些数量的分区。如果您的可用处理器数量是12(即local[Runtime.getRuntime.availableProcessors()]),并且您有1到10的列表,那么将只创建10个分区

注意:

如果你在一台12核笔记本电脑上,我正在执行spark程序,默认情况下分区/任务的数量是所有可用内核的数量,即12个。那个表示local[*]s"local[${Runtime.getRuntime.availableProcessors()}]"),但在此如果只有10个数字,则限制为10

记住所有这些指针,我建议你尝试自己的

转换为RDD,然后获得分区长度

DF.rdd.partitions.length
val df = Seq(
("A", 1), ("B", 2), ("A", 3), ("C", 1)
).toDF("k", "v")
df.rdd.getNumPartitions

获取分区数量的一种更有趣的方法是"使用mapPartitions"转换。示例代码-

val x = (1 to 10).toList
val numberDF = x.toDF()
numberDF.rdd.mapPartitions(x => Iterator[Int](1)).sum()

欢迎星火专家对其性能发表评论。

相关内容

  • 没有找到相关文章

最新更新