我有RDD[Row]
,它需要持久化到第三方存储库。但是这个第三方存储库在单个调用中最多接受5mb。
所以我想根据RDD中存在的数据的大小来创建分区,而不是基于RDD中存在的行数。
我如何找到RDD
的大小并基于它创建分区?
正如Justin和Wang提到的,要得到RDD的大小并不是直截了当的。我们可以估算一下。
我们可以对RDD进行采样,然后使用SizeEstimator来获得样本的大小。正如王和犹斯丁提到的,基于离线采样的数据大小,例如,X行离线使用Y GB, Z行在运行时可能占用Z*Y/X GB
下面是示例scala代码,用于获取RDD的大小/估计。我是scala和spark的新手。下面的示例可以用更好的方式写def getTotalSize(rdd: RDD[Row]): Long = {
// This can be a parameter
val NO_OF_SAMPLE_ROWS = 10l;
val totalRows = rdd.count();
var totalSize = 0l
if (totalRows > NO_OF_SAMPLE_ROWS) {
val sampleRDD = rdd.sample(true, NO_OF_SAMPLE_ROWS)
val sampleRDDSize = getRDDSize(sampleRDD)
totalSize = sampleRDDSize.*(totalRows)./(NO_OF_SAMPLE_ROWS)
} else {
// As the RDD is smaller than sample rows count, we can just calculate the total RDD size
totalSize = getRDDSize(rdd)
}
totalSize
}
def getRDDSize(rdd: RDD[Row]) : Long = {
var rddSize = 0l
val rows = rdd.collect()
for (i <- 0 until rows.length) {
rddSize += SizeEstimator.estimate(rows.apply(i).toSeq.map { value => value.asInstanceOf[AnyRef] })
}
rddSize
}
一个直接的方法是调用下面的方法,这取决于你是否想要以序列化的形式存储你的数据,然后去spark UI的"存储"页面,你应该能够计算出RDD(内存+磁盘)的总大小:
rdd.persist(StorageLevel.MEMORY_AND_DISK)
or
rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
在运行时计算准确的内存大小并不容易。你可以尝试在运行时做一个估计:基于离线采样的数据大小,比如,X行离线使用Y GB,运行时Z行可能占用Z*Y/X GB;这和Justin之前建议的很相似。
我想RDD.count()会给出RDD中元素的数量
这将取决于诸如序列化之类的因素,因此它不是一成不变的。但是,您可以选取一个样本集并对该样本数据进行一些实验,并从中进行推断。
如果你实际在一个集群上处理大数据,这是你应该使用的版本——也就是说,它消除了collect。
def calcRDDSize(rdd: RDD[Row]): Long = {
rdd.map(_.mkString(",").getBytes("UTF-8").length.toLong)
.reduce(_+_) //add the sizes together
}
def estimateRDDSize( rdd: RDD[Row], fraction: Double ) : Long = {
val sampleRDD = rdd.sample(true,fraction)
val sampleRDDsize = calcRDDSize(sampleRDD)
println(s"sampleRDDsize is ${sampleRDDsize/(1024*1024)} MB")
val sampleAvgRowSize = sampleRDDsize / sampleRDD.count()
println(s"sampleAvgRowSize is $sampleAvgRowSize")
val totalRows = rdd.count()
println(s"totalRows is $totalRows")
val estimatedTotalSize = totalRows * sampleAvgRowSize
val formatter = java.text.NumberFormat.getIntegerInstance
val estimateInMB = formatter.format(estimatedTotalSize/(1024*1024))
println(s"estimatedTotalSize is ${estimateInMB} MB")
return estimatedTotalSize
}
// estimate using 15% of data
val size = estimateRDDSize(df.rdd,0.15)