apache spark -如何找到一个RDD的大小



我有RDD[Row],它需要持久化到第三方存储库。但是这个第三方存储库在单个调用中最多接受5mb。

所以我想根据RDD中存在的数据的大小来创建分区,而不是基于RDD中存在的行数。

我如何找到RDD的大小并基于它创建分区?

正如Justin和Wang提到的,要得到RDD的大小并不是直截了当的。我们可以估算一下。

我们可以对RDD进行采样,然后使用SizeEstimator来获得样本的大小。正如王和犹斯丁提到的,基于离线采样的数据大小,例如,X行离线使用Y GB, Z行在运行时可能占用Z*Y/X GB

下面是示例scala代码,用于获取RDD的大小/估计。我是scala和spark的新手。下面的示例可以用更好的方式写
def getTotalSize(rdd: RDD[Row]): Long = {
  // This can be a parameter
  val NO_OF_SAMPLE_ROWS = 10l;
  val totalRows = rdd.count();
  var totalSize = 0l
  if (totalRows > NO_OF_SAMPLE_ROWS) {
    val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
    val sampleRDDSize = getRDDSize(sampleRDD)
    totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
  } else {
    // As the RDD is smaller than sample rows count, we can just calculate the total RDD size
    totalSize = getRDDSize(rdd)
  }
  totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
    var rddSize = 0l
    val rows = rdd.collect()
    for (i <- 0 until rows.length) {
       rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
    }
    rddSize
}

一个直接的方法是调用下面的方法,这取决于你是否想要以序列化的形式存储你的数据,然后去spark UI的"存储"页面,你应该能够计算出RDD(内存+磁盘)的总大小:

rdd.persist(StorageLevel.MEMORY_AND_DISK)
or
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

在运行时计算准确的内存大小并不容易。你可以尝试在运行时做一个估计:基于离线采样的数据大小,比如,X行离线使用Y GB,运行时Z行可能占用Z*Y/X GB;这和Justin之前建议的很相似。

我想RDD.count()会给出RDD中元素的数量

这将取决于诸如序列化之类的因素,因此它不是一成不变的。但是,您可以选取一个样本集并对该样本数据进行一些实验,并从中进行推断。

如果你实际在一个集群上处理大数据,这是你应该使用的版本——也就是说,它消除了collect。

def calcRDDSize(rdd: RDD[Row]): Long = {
  rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
     .reduce(_+_) //add the sizes together
}
def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
  val sampleRDD = rdd.sample(true,fraction)
  val sampleRDDsize = calcRDDSize(sampleRDD)
  println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")
  val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
  println(s"sampleAvgRowSize is $sampleAvgRowSize")
  val totalRows = rdd.count()
  println(s"totalRows is $totalRows")
  val estimatedTotalSize = totalRows * sampleAvgRowSize
  val formatter = java.text.NumberFormat.getIntegerInstance
  val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
  println(s"estimatedTotalSize is ${estimateInMB} MB")
  return estimatedTotalSize
}
// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)

相关内容

  • 没有找到相关文章

最新更新