Scala-ListBuffer在每个外接程序循环后清空自己



我试图在循环中向Scala ListBuffer添加许多对象,但每次添加一个对象,它都会在循环的下一次迭代中消失。

当我在添加新条目之前和之后打印ListBuffer的内容时,我得到以下输出:

添加之前:ListBuffer()

添加后:ListBuffer(com.me.FeatureV2@20d953ba)

添加之前:ListBuffer()

添加后:ListBuffer(com.me.FeatureV2@6b768ce7)

添加之前:ListBuffer()

添加后:ListBuffer(com.me.FeatureV2@123f42d5)

代码:

def generateStatistics(df: DataFrame): List[FeatureV2] = {
    var features = ListBuffer[FeatureV2]()
    val dataColumn = "data"
    for (field <- df.schema.fieldNames){
      val columnType: String = df.select(field).dtypes(0)._2
      if (columnType == StringType.toString){
        val statsDf: DataFrame = getStats(df, field, dataColumn)
        for (row <- statsDf){
          println("Before add: " + features)
          val feature = new FeatureV2()
          feature.element = row.getString(0)
          feature.count = row.getLong(1)
          feature.sum = row.getDouble(2)
          feature.max = row.getDouble(3)
          feature.min = row.getDouble(4)
          feature.feature = field
          features += feature
          println("After add: " + features)
        }
      }
    }
    features.toList
  }

然而,有时我会得到以下信息:

添加之前:ListBuffer()

添加后:ListBuffer(com.me.FeatureV2@1433183c)

添加之前:ListBuffer(com.me.FeatureV2@1433183c)

添加后:ListBuffer(com.me.FeatureV2@1433183c,com.me.FeatureV2@4b0df9e5)

添加之前:ListBuffer()

添加后:ListBuffer(com.me.FeatureV2@1e201b19)

看起来它实际上是在填充ListBuffer,但它正在被清除。与垃圾收集有关?

尝试将for (row <- statsDf)更改为for (row <- statsDf.collect())

如果这解决了您的问题,那么您的问题可能是由foreach在一个或多个线程中运行这一事实引起的。

for (row <- stadsDf)是实际调用的DataFrame.foreach(f: Row => Unit),它是一个分布式foreach,其中f可以在任何数量的线程或机器上运行,具体取决于您的Spark主机。

Spark应用程序由驱动程序和执行程序组成。您可以从驱动程序中控制和创建事物——执行器获取范围内变量的副本。因此,执行者获得ListBuffer的副本。它们附加到它们的副本中,这些副本在任务完成时丢失。

您可以使用collect()将数据拉入驱动程序以附加到那里的ListBuffer,或者使用广播变量。

有关讨论,请参阅文档。

集合是可变的吗?

此外,当使用Scala时,应该努力进行FP.

df.schema.fieldNames.map {...}

可能会做你需要的工作。既然你有一个if,也许collect会更适合

最新更新