尝试将数据帧行映射到更新行时编码器错误



当我试图在我的代码中做同样的事情时

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

我从这里得到了上述参考:Scala:如何使用Scala替换Dataframs中的值但我得到编码器错误

无法找到存储在数据集中的类型的编码器。原始类型(Int, S string等)和Product类型(case类)进口火花。im plicits。支持序列化其他类型在以后的版本中添加。

注意:我正在使用spark 2.0!

这里没有什么意想不到的。您正在尝试使用用Spark 1编写的代码。在Spark 2.0中不再支持

  • 1。x DataFrame.map((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • 2。x Dataset[Row].map((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

老实说,它在1中没有多大意义。x。与版本无关,您可以简单地使用DataFrame API:

import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
  (2012, "Tesla", "S"), (1997, "Ford", "E350"),
  (2015, "Chevy", "Volt")
).toDF("year", "make", "model")
df.withColumn("make", when(lower($"make") === "tesla", "S").otherwise($"make"))

如果你真的想使用map,你应该使用静态类型的Dataset:

import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
  case tesla if tesla.make.toLowerCase == "tesla" => tesla.copy(make = "S")
  case rec => rec
}

或至少返回一个具有隐式编码器的对象:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == "tesla") "S" else make, model)
}

最后,如果出于一些完全疯狂的原因,你真的想映射到Dataset[Row]上,你必须提供所需的编码器:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField("year", IntegerType),
  StructField("make", StringType),
  StructField("model", StringType)
))
val encoder = RowEncoder(schema)
df.map {
  case Row(year, make: String, model) if make.toLowerCase == "tesla" => 
    Row(year, "S", model)
  case row => row
} (encoder)

对于预先知道数据框架模式的场景,@zero323给出的答案是解决方案

,但对于动态模式的场景/或将多个数据帧传递给泛型函数:下面的代码在我们从1.6.1向2.2.0迁移的过程中可以正常工作

import org.apache.spark.sql.Row
val df = Seq(
   (2012, "Tesla", "S"), (1997, "Ford", "E350"),
   (2015, "Chevy", "Volt")
 ).toDF("year", "make", "model")
val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == "tesla") "S" else row1
  Row(row(0),make,row(2))
})

这段代码在两个版本的spark上执行。

缺点:提供了优化在dataframe/datassets上的spark不会应用API

只是为了更好地理解其他答案(特别是@zero323关于 map优于Dataset[Row] 的回答的最后一点)添加一些其他重要的知道点:

  • 首先,Dataframe.map给你一个Dataset(更具体地说,Dataset[T],而不是Dataset[Row])!
  • Dataset[T]总是需要一个编码器,这就是这个句子&; Dataset[Row].map &; ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T] &;意思。
  • Spark已经预定义了许多编码器(可以通过执行import spark.implicits._来实现import),但是该列表仍然无法涵盖开发人员可能创建的许多领域特定类型,在这种情况下,您需要自己创建编码器。
  • 在本页的具体示例中,df.mapDataset返回Row类型,并挂一分钟,Row类型不在Spark预定义的编码器类型列表中,因此您将自己创建一个。
  • 我承认为Row类型创建编码器与上面链接中描述的方法有点不同,您必须使用RowEncoder,它将StructType作为描述行类型的参数,就像上面的@zero323提供的那样:
// this describes the internal type of a row
val schema = StructType(Seq(StructField("year", IntegerType), StructField("make", StringType), StructField("model", StringType)))
// and this completes the creation of encoder
// for the type `Row` with internal schema described above
val encoder = RowEncoder(schema)

在我的spark 2.4.4版本中,我必须导入隐式。这是一个一般的答案

val spark2 = spark
import spark2.implicits._
val data = df.rdd.map(row => my_func(row))

my_func做了一些操作。

相关内容

  • 没有找到相关文章

最新更新