请注意:虽然这个问题提到了Spark(2.1(,但我认为这确实是一个Scala(2.11(问题的核心,任何精通Scala开发人员都可以回答它!
我有以下代码可以创建一个 Spark 数据集(基本上是一个 2D 表(并逐行迭代它。如果特定行的username
列的值为"fizzbuzz",那么我想设置一个在迭代器外部定义的变量,并在行迭代完成后使用该变量:
val myDataset = sqlContext
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "mytable", "keyspace" -> "mykeyspace"))
.load()
var foobar : String
myDataset.collect().foreach(rec =>
if(rec.getAs("username") == "fizzbuzz") {
foobar = rec.getAs("foobarval")
}
)
if(foobar == null) {
throw new Exception("The fizzbuzz user was not found.")
}
当我运行它时,我得到以下异常:
error: class $iw needs to be abstract, since:
it has 2 unimplemented members.
/** As seen from class $iw, the missing signatures are as follows.
* For convenience, these are usable as stub implementations.
*/
def foobar=(x$1: String): Unit = ???
class $iw extends Serializable {
^
我得到这个有什么特别的原因吗?
在方法或非抽象类中,必须为每个变量定义一个值;在这里,您将foobar
保留为未定义。如果您将其定义为具有null
的初始值,则事情将按预期工作:
var foobar: String = null
但是:请注意,您的代码既不习惯(不遵循 Scala 和 Spark 的最佳实践(,又有潜在风险/缓慢:
- 你应该避免可变的值,如
foobar
- 不可变代码更容易推理,并且真的会让你利用Scala的强大功能 - 应避免在数据帧上调用
collect
,除非您确定它非常小,因为collect
会将工作器节点(其中可能有很多(的所有数据收集到单个驱动程序节点中,这会很慢,并可能导致OutOfMemoryError
。 - 不鼓励使用
null
(因为它通常会导致意外NullPointerException
(
此代码的更惯用版本将使用DataFrame.filter
来筛选相关记录,并且可能Option
正确表示潜在的空值,如下所示:
import spark.implicits._
val foobar: Option[String] = myDataset
.filter($"username" === "fizzbuzz") // filter only relevant records
.take(1) // get first 1 record (if it exists) as an Array[Row]
.headOption // get the first item in the array, or None
.map(r => r.getAs[String]("foobarval")) // get the value of the column "foobarval", or None
if (foobar.isEmpty) {
throw new Exception("The fizzbuzz user was not found.")
}
foobar
变量应该初始化:
var foobar: String = null
这也看起来不对:
foobar = rec.getAs("foobarval")
并且应该是:
foobar = rec.getAs[String]("foobarval")
总的来说,这不是要走的路。它根本没有从Spark执行模型中受益。我会过滤并取而代之:
myDataset.filter($"username" === "fizzbuzz").select("foobarval").take(1)
您可能应该在数据帧上使用筛选器和选择:
import spark.sqlContext.implicits._
val data = spark.sparkContext.parallelize(List(
"""{ "username": "none", "foobarval":"none" }""",
"""{ "username": "fizzbuzz", "foobarval":"expectedval" }"""))
val df = spark.read.json(data)
val foobar = df.filter($"username" === "fizzbuzz").select($"foobarval").collect.head.getString(0)