我正在使用带有Spark 2.0预览的Databricks社区版。我尝试了以下(简单)代码:
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
import java.util.Calendar
import spark.implicits._
case class C1(f1: String, f2: String, f3: String, f4:java.sql.Date, f5: Double)
val teams = sc.parallelize(Seq(C1("hash1", "NLC", "Cubs", Java.sql.Date.valueOf("2016-01-23"), 3253.21), C1("hash1", "NLC", "Cubs", java.sql.Date.valueOf("2014-01-23"), 353.88), C1("hash3", "NLW", "Dodgers", java.sql.Date.valueOf("2013-08-15"), 4322.12),C1("hash4", "NLE", "Red Sox", java.sql.Date.valueOf("2010-03-14"), 10283.72))).toDS
object C1Agg extends Aggregator[C1, Seq[C1], Seq[C1]] {
def zero: Seq[C1] = Seq.empty[C1] //Nil
def reduce(b: Seq[C1], a: C1): Seq[C1] = b :+ a
def merge(b1: Seq[C1], b2: Seq[C1]): Seq[C1] = b1 ++ b2
def finish(r: Seq[C1]): Seq[C1] = r
override def bufferEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]
override def outputEncoder: Encoder[Seq[C1]] = newProductSeqEncoder[C1]
}
val g_c1 = teams.groupByKey(_.f1).agg[Seq[C1]](C1Agg.toColumn).collect
我收到以下错误消息:
错误:键入不匹配;
找到:org.apache.spark.sql.typedcolumn [c1,seq [c1]]
必需:org.apache.spark.sql.typedcolumn [c1,seq [c1]]
val g_c1 = teams.groupbykey(_。f1).aggseq [c1]
当我使用
时val g_c1 = teams.groupByKey(_.f1).agg(C1Agg.toColumn).collect
我得到:
错误:键入不匹配;
找到:org.apache.spark.sql.typedcolumn [c1,seq [c1]]
必需:org.apache.spark.sql.typedcolumn [c1,?]
val g_c1 = teams.groupbykey(_。f1).aggseq [c1]
有任何提示?
我找到了原因:这是因为我在一个单元格上声明了案例类(笔记本),然后在不同后续的单元格中使用它。
将整个代码放在同一单元格中解决了这个问题。(不幸的是,现在我面临着另一个问题,缺少RequirementError)