Scala:java.lang.UnsupportedOperationException:不支持Primitive类型



我添加了以下代码:

var counters: Map[String, Int] = Map()
val results = rdd.filter(l => l.contains("xyz")).map(l => mapEvent(l)).filter(r => r.isDefined).map (
i => {
val date = i.get.getDateTime.toString.substring(0, 10)
counters = counters.updated(date, counters.getOrElse(date, 0) + 1)
}
)

我想在一次迭代中获得RDD中不同日期的计数。但当我运行这个时,我收到消息说:

No implicits found for parameters evidence$6: Encoder[Unit]

所以我加了一行:

implicit val myEncoder: Encoder[Unit] = org.apache.spark.sql.Encoders.kryo[Unit]

但后来我犯了这个错误。

Exception in thread "main" java.lang.ExceptionInInitializerError
at com.xyz.SparkBatchJob.main(SparkBatchJob.scala)
Caused by: java.lang.UnsupportedOperationException: Primitive types are not supported.
at org.apache.spark.sql.Encoders$.genericSerializer(Encoders.scala:200)
at org.apache.spark.sql.Encoders$.kryo(Encoders.scala:152)

我该如何解决这个问题?或者有没有更好的方法可以在一次迭代(O(N(时间(中获得我想要的计数?

Spark RDD是分布式集合的表示。当您将映射函数应用于RDD时,用于操作集合的函数将在集群中执行,因此更改在映射函数范围之外创建的变量是没有意义的。

在您的代码中,问题是因为您没有返回任何值,而是试图更改结构,因此编译器推断转换后新创建的RDD是RDD[Unit]。

如果需要通过Spark操作创建Map,则必须创建pairRDD,然后应用reduce操作。

包括rdd的类型和mapEvent函数,以了解如何执行。

Spark通过转换和操作构建了一个DAG,它不处理两次数据。

最新更新