将数据框转换为强类型数据集?



我有以下类,run从数据库表中返回整数列表。

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
}
}

以下代码

val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
val processed = itemListJob.run(rc, priority).select("id").map(d => {
runJob.run(d) // d expected to be int
})
processed.saveAsTextFile("c:\temp\mpa")

获取错误

[错误] ...\src\main\scala\main.scala:39:类型不匹配; [错误] 找到: org.apache.spark.sql.Row [错误] 必需:Int [error] runJob.run(d) [错误] ^ [错误] 发现一个错误 [错误](编译:编译增量)编译失败,

我试过

  1. val processed = itemListJob.run(rc, priority).select("id").as[Int].map(d =>
  2. case class itemListRow(id: Int); ....as[itemListRow].

他们俩都得到了错误

找不到存储在数据集中的类型的编码器。导入支持基元类型(整数、字符串等)和产品类型(案例类spark.implicits._ 将来的版本中将添加对序列化其他类型的支持。

更新:我正在尝试添加导入隐式语句

  1. import sc.implicits._错误

    值隐式不是 org.apache.spark.SparkContext 的成员

  2. import sqlContext.implicits._没关系。但是,processed.saveAsTextFile("c:\temp\mpa")后来的陈述得到了错误

    value saveAsTextFile 不是 org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)] 的成员

您只需将带有select("id")的行更改为如下:

select("id").as[Int]

您应该导入用于将Rows转换为 Int 的隐式。

import sqlContext.implicits._ // <-- import implicits that add the "magic"

您还可以更改run以包含转换,如下所示(请注意我添加的行的注释):

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
import sqlContext.implicits._ // <-- import implicits that add the "magic"
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
.select("id") // <-- take only "id" (which Spark pushes down and hence makes your query faster
.as[Int] // <-- convert Row into Int
}
}

value saveAsTextFile 不是 org.apache.spark.sql.Dataset[(Int, java.time.LocalDate)] 的成员

编译错误是因为您尝试对不可用Dataset使用saveAsTextFile操作。

在Spark SQL中写入是通过DataFrameWriter进行的,可以使用write运算符:

write:DataFrameWriter[T]接口,用于将非流式数据集的内容保存到外部存储中。

因此,您应该执行以下操作:

processed.write.text("c:\temp\mpa")

做!

最新更新