Scala Spark:如何从Spark数据帧的列中引导样本



我希望在集群环境中的Jupyter Notebook设置中使用Scala编程语言,从Spark DataFrame的一列中采样值并进行替换。我该怎么做?

我尝试了我在网上找到的以下功能:

import scala.util
def bootstrapMean(originalData: Array[Double]): Double = {
val n = originalData.length
def draw: Double = originalData(util.Random.nextInt(n))
// a tail recursive loop to randomly draw and add a value to the accumulating sum
def drawAndSumValues(current: Int, acc: Double = 0D): Double = {
if (current == 0) acc
else drawAndSumValues(current - 1, acc + draw)
}
drawAndSumValues(n) / n
}

像这样:

val data = stack.select("column_with_values").collect.map(_.toSeq).flatten
val m = 10
val bootstraps = Vector.fill(m)(bootstrapMean(data))

但我得到了错误:

An error was encountered:
<console>:47: error: type mismatch;
found   : Array[Any]
required: Array[Double]
Note: Any >: Double, but class Array is invariant in type T.
You may wish to investigate a wildcard type such as `_ >: Double`. (SLS 3.2.10)
val bootstraps = Vector.fill(m)(bootstrapMean(data))

不知道如何调试,也不知道我是否应该麻烦或尝试另一种方法。我正在寻找想法/文档/代码。谢谢

更新:

如何将下面用户mck的解决方案放入for循环中?我尝试了以下方法:

var bootstrap_container = Seq()
var a = 1
for( a <- 1 until 3){
var sampled = stack_b.select("diff_hours").sample(withReplacement = true, fraction = 0.5, seed = a)
var smpl_average = sampled.select(avg("diff_hours")).collect()(0)(0)
var bootstrap_smpls = bootstrap_container.union(Seq(smpl_average)).collect()
}
bootstrap_smpls

但这给出了一个错误:

<console>:49: error: not enough arguments for method collect: (pf: PartialFunction[Any,B])(implicit bf: scala.collection.generic.CanBuildFrom[Seq[Any],B,That])That.
Unspecified value parameter pf.
var bootstrap_smpls = bootstrap_container.union(Seq(smpl_average)).collect()

您可以使用数据帧的sample方法,例如,如果您想用替换和0.5的分数进行采样:

val sampled = stack.select("column_with_values").sample(true, 0.5)

要得到平均值,你可以做:

val col_average = sampled.select(avg("column_with_values")).collect()(0)(0)

编辑

var bootstrap_container = List[Double]()
var a = 1
for( a <- 1 until 3){
var sampled = stack_b2.select("diff_hours").sample(withReplacement = true, fraction = 0.5, seed = a)
var smpl_average = sampled.select(avg("diff_hours")).collect()(0)(0)
bootstrap_container = bootstrap_container :+ smpl_average.asInstanceOf[Double]
}
var mean_bootstrap = bootstrap_container.reduce(_ + _) / bootstrap_container.length

最新更新