我正在尝试使用Apache Spark的2.0数据集:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import spark.implicits._
case class C1(f1: String, f2: String, f3: String, f4: String, f5: Double)
val teams = Seq(
C1("hash1", "NLC", "Cubs", "2016-01-23", 3253.21),
C1("hash1", "NLC", "Cubs", "2014-01-23", 353.88),
C1("hash3", "NLW", "Dodgers", "2013-08-15", 4322.12),
C1("hash4", "NLE", "Red Sox", "2010-03-14", 10283.72)
).toDS()
val c1Agg = new Aggregator[C1, Seq[C1], Seq[C1]] with Serializable {
def zero: Seq[C1] = Seq.empty[C1] //Nil
def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
def finish(r: Seq[C1]): Seq[C1] = r
override def bufferEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]
override def outputEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]
}.toColumn
val g_c1 = teams.groupByKey(_.f1).agg(c1Agg).collect
但是当我运行它时,我得到了以下错误信息:
scale .reflect.internal. missingrequirementerror: class lineb4c2bb72bf6e417e9975d1a65602aec912。$read in javammirror with sun.misc。启动器$AppClassLoader@14dad5dc类型类sun.misc。启动器$AppClassLoader与类路径[省略]未找到
我假设配置是正确的,因为我在Databricks社区云下运行。
我终于能够使它工作通过使用ExpressionEncoder()而不是newProductSeqEncoder[C1]在第20行,21。
(不知道为什么之前的代码不工作,但)