我正在尝试创建通用数据集[t]读取器,以避免使用dataframe.as [..]为每个阅读器调用。有对原始类型和案例类的支持,所以我正在考虑以下内容:
def read[T <: Product](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
,但是我无法找到存储在数据集中的类型'错误的EDGODER。可以做这样的事情吗?
第二个周期:
def read[T <: Product](sql : String) : Dataset[T] = {
import sparkSession.implicits._
innerRead(sql)
}
private def innerRead[T <: Product : Encoder](sql : String): Dataset[T] = {
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
以类型不匹配结束(foudn encoder [nothing],必需的encoder [t](。
我只是尝试导入newproductencoder,但结束了。
为了将DataFrame
转换为Dataset
,您需要具有Encoder
。您可以通过简单地添加T
上的上下文和Encoder
的上下文来完成此操作:
def read[T <: Product : Encoder](sql : String): Dataset[T] = {
import sparkSession.implicits._
val sqlContext = sparkSession.sqlContext
val df: DataFrame = sqlContext.read.option("query", sql).load()
df.as[T]
}
上下文是句法糖:
def read[T <: Product](sql : String)(implicit $ev: Encoder[T]): Dataset[T]
这意味着您需要在隐式上下文中拥有一个(又有一个(Encoder[T]
的实例。
这是需要的,因为as
方法本身需要此上下文绑定。
Spark本身可以通过导入(像您一样(对SparkSession
的隐含物,为您提供所需的大部分Encoder
S(原始词,String
S和case class
ES(。但是,这些必须在呼叫站点的隐式范围中可用,这意味着您想要拥有的可能更像以下内容:
def read[T <: Product : Encoder](spark: SparkSession, sql: String): Dataset[T] = {
import spark.implicits._
val df: DataFrame = spark.sqlContext.read.option("query", sql).load()
df.as[T]
}
val spark: SparkSession = ??? // your SparkSession object
import spark.implicits._
val ds: Dataset[YourType] = read[YourType](spark, "select something from a_table")
在您的第二个周期中,也许您需要将类型参数提供给内读呼叫:
innerRead[T](sql)