Spark/Scala 迭代器无法分配在 foreach 循环之外定义的变量



请注意:虽然这个问题提到了Spark(2.1(,但我认为这确实是一个Scala(2.11(问题的核心,任何精通Scala开发人员都可以回答它!


我有以下代码可以创建一个 Spark 数据集(基本上是一个 2D 表(并逐行迭代它。如果特定行的username列的值为"fizzbuzz",那么我想设置一个在迭代器外部定义的变量,并在行迭代完成后使用该变量:

val myDataset = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "mykeyspace"))
.load()
var foobar : String
myDataset.collect().foreach(rec =>
if(rec.getAs("username") == "fizzbuzz") {
foobar = rec.getAs("foobarval")
}
)
if(foobar == null) {
throw new Exception("The fizzbuzz user was not found.")
}

当我运行它时,我得到以下异常:

error: class $iw needs to be abstract, since:
it has 2 unimplemented members.
/** As seen from class $iw, the missing signatures are as follows.
*  For convenience, these are usable as stub implementations.
*/
def foobar=(x$1: String): Unit = ???
class $iw extends Serializable {
^

我得到这个有什么特别的原因吗?

在方法或非抽象类中,必须为每个变量定义一个值;在这里,您将foobar保留为未定义。如果您将其定义为具有null的初始值,则事情将按预期工作:

var foobar: String = null

但是:请注意,您的代码既不习惯(不遵循 Scala 和 Spark 的最佳实践(,又有潜在风险/缓慢:

  • 你应该避免可变的值,如foobar- 不可变代码更容易推理,并且真的会让你利用Scala的强大功能
  • 应避免在数据帧上调用collect,除非您确定它非常小,因为collect会将工作器节点(其中可能有很多(的所有数据收集到单个驱动程序节点中,这会很慢,并可能导致OutOfMemoryError
  • 不鼓励使用null(因为它通常会导致意外NullPointerException(

此代码的更惯用版本将使用DataFrame.filter来筛选相关记录,并且可能Option正确表示潜在的空值,如下所示:

import spark.implicits._
val foobar: Option[String] = myDataset
.filter($"username" === "fizzbuzz") // filter only relevant records
.take(1) // get first 1 record (if it exists) as an Array[Row]
.headOption // get the first item in the array, or None
.map(r => r.getAs[String]("foobarval")) // get the value of the column "foobarval", or None
if (foobar.isEmpty) {
throw new Exception("The fizzbuzz user was not found.")
}

foobar变量应该初始化:

var foobar: String = null

这也看起来不对:

foobar = rec.getAs("foobarval")

并且应该是:

foobar = rec.getAs[String]("foobarval")

总的来说,这不是要走的路。它根本没有从Spark执行模型中受益。我会过滤并取而代之:

myDataset.filter($"username" === "fizzbuzz").select("foobarval").take(1)

您可能应该在数据帧上使用筛选器和选择:

import spark.sqlContext.implicits._
val data = spark.sparkContext.parallelize(List(
"""{ "username": "none", "foobarval":"none" }""",
"""{ "username": "fizzbuzz", "foobarval":"expectedval" }"""))
val df = spark.read.json(data)
val foobar = df.filter($"username" === "fizzbuzz").select($"foobarval").collect.head.getString(0)

最新更新