我试图在循环中向Scala ListBuffer添加许多对象,但每次添加一个对象,它都会在循环的下一次迭代中消失。
当我在添加新条目之前和之后打印ListBuffer的内容时,我得到以下输出:
添加之前:ListBuffer()
添加后:ListBuffer(com.me.FeatureV2@20d953ba)
添加之前:ListBuffer()
添加后:ListBuffer(com.me.FeatureV2@6b768ce7)
添加之前:ListBuffer()
添加后:ListBuffer(com.me.FeatureV2@123f42d5)
代码:
def generateStatistics(df: DataFrame): List[FeatureV2] = {
var features = ListBuffer[FeatureV2]()
val dataColumn = "data"
for (field <- df.schema.fieldNames){
val columnType: String = df.select(field).dtypes(0)._2
if (columnType == StringType.toString){
val statsDf: DataFrame = getStats(df, field, dataColumn)
for (row <- statsDf){
println("Before add: " + features)
val feature = new FeatureV2()
feature.element = row.getString(0)
feature.count = row.getLong(1)
feature.sum = row.getDouble(2)
feature.max = row.getDouble(3)
feature.min = row.getDouble(4)
feature.feature = field
features += feature
println("After add: " + features)
}
}
}
features.toList
}
然而,有时我会得到以下信息:
添加之前:ListBuffer()
添加后:ListBuffer(com.me.FeatureV2@1433183c)
添加之前:ListBuffer(com.me.FeatureV2@1433183c)
添加后:ListBuffer(com.me.FeatureV2@1433183c,com.me.FeatureV2@4b0df9e5)
添加之前:ListBuffer()
添加后:ListBuffer(com.me.FeatureV2@1e201b19)
看起来它实际上是在填充ListBuffer,但它正在被清除。与垃圾收集有关?
尝试将for (row <- statsDf)
更改为for (row <- statsDf.collect())
。
如果这解决了您的问题,那么您的问题可能是由foreach
在一个或多个线程中运行这一事实引起的。
for (row <- stadsDf)
是实际调用的DataFrame.foreach(f: Row => Unit)
,它是一个分布式foreach
,其中f
可以在任何数量的线程或机器上运行,具体取决于您的Spark主机。
Spark应用程序由驱动程序和执行程序组成。您可以从驱动程序中控制和创建事物——执行器获取范围内变量的副本。因此,执行者获得ListBuffer
的副本。它们附加到它们的副本中,这些副本在任务完成时丢失。
您可以使用collect()
将数据拉入驱动程序以附加到那里的ListBuffer
,或者使用广播变量。
有关讨论,请参阅文档。
集合是可变的吗?
此外,当使用Scala时,应该努力进行FP.
df.schema.fieldNames.map {...}
可能会做你需要的工作。既然你有一个if
,也许collect
会更适合