如何用泛型case类实现trait,在Scala中创建一个数据集



我想创建一个应该用case类T实现的Scala trait,该trait只是简单地加载数据并将其转换为T类型的Spark Dataset。我得到了没有编码器可以存储的错误,我认为这是因为Scala不知道T应该是case类。我怎么告诉编译器?我在某个地方看到我应该提到Product,但是没有定义这样的类。请随意建议其他方法来做到这一点!

我有以下代码,但它没有编译错误:42:错误:无法找到存储在数据集中的类型的编码器。通过导入sqlContext.implicits._,可以支持基本类型(Int, String等)和产品类型(case类)[信息]。as [T]

我正在使用Spark 1.6.1

代码:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Dataset, SQLContext}    
/**
      * A trait that moves data on Hadoop with Spark based on the location and the granularity of the data.
      */
    trait Agent[T] {
      /**
        * Load a Dataframe from the location and convert into a Dataset
        * @return Dataset[T]
        */
      protected def load(): Dataset[T] = {
        // Read in the data
        SparkContextKeeper.sqlContext.read
          .format("com.databricks.spark.csv")
          .load("/myfolder/" + location + "/2016/10/01/")
          .as[T]
      }
    }

你的代码缺少3个东西:

  • 事实上,你必须让编译器知道T是Product的子类(所有Scala case类和Tuples的超类)
  • 编译器还需要实际case类的TypeTagClassTag。这是Spark隐式使用来克服类型擦除
  • sqlContext.implicits._进口
不幸的是,你不能在trait中添加带有上下文边界的类型参数,所以最简单的解决方法是使用abstract class:
import scala.reflect.runtime.universe.TypeTag
import scala.reflect.ClassTag
abstract class Agent[T <: Product : ClassTag : TypeTag] {
  protected def load(): Dataset[T] = { 
    val sqlContext: SQLContext = SparkContextKeeper.sqlContext
    import sqlContext.implicits._
    sqlContext.read.// same... 
  }
}

显然,这并不等同于使用trait,并且可能暗示该设计不是最适合该工作的。另一种选择是将load放在对象中,并将类型参数移动到方法:

object Agent {
  protected def load[T <: Product : ClassTag : TypeTag](): Dataset[T] = {
    // same...
  }
}

哪一个更可取,主要取决于你要在哪里和如何调用load,以及你打算用结果做什么。

您需要采取两个行动:

  1. 添加import sparkSession.implicits._在您的进口
  2. 设置你的性状为trait Agent[T <: Product]

相关内容

  • 没有找到相关文章