Spark SQL-通用数据集读取器



我正在尝试创建通用数据集[t]读取器,以避免使用dataframe.as [..]为每个阅读器调用。有对原始类型和案例类的支持,所以我正在考虑以下内容:

def read[T <: Product](sql : String): Dataset[T] = {
  import sparkSession.implicits._
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}

,但是我无法找到存储在数据集中的类型'错误的EDGODER。可以做这样的事情吗?

第二个周期:

def read[T <: Product](sql : String) : Dataset[T] = {
  import sparkSession.implicits._
  innerRead(sql)
}
private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}  

以类型不匹配结束(foudn encoder [nothing],必需的encoder [t](。

我只是尝试导入newproductencoder,但结束了。

为了将DataFrame转换为Dataset,您需要具有Encoder。您可以通过简单地添加T上的上下文和Encoder的上下文来完成此操作:

def read[T <: Product : Encoder](sql : String): Dataset[T] = {
  import sparkSession.implicits._
  val sqlContext = sparkSession.sqlContext
  val df: DataFrame = sqlContext.read.option("query", sql).load()
  df.as[T]
}

上下文是句法糖:

def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]

这意味着您需要在隐式上下文中拥有一个(又有一个(Encoder[T]的实例。

这是需要的,因为as方法本身需要此上下文绑定。

Spark本身可以通过导入(像您一样(对SparkSession的隐含物,为您提供所需的大部分Encoder S(原始词,String S和case class ES(。但是,这些必须在呼叫站点的隐式范围中可用,这意味着您想要拥有的可能更像以下内容:

def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
  import spark.implicits._
  val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
  df.as[T]
}
val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")

在您的第二个周期中,也许您需要将类型参数提供给内读呼叫:

innerRead[T](sql)