Scala noob在这里。还在努力学习语法。
我正在努力减少将测试数据转换为DataFrames所需的代码。以下是我现在拥有的:
def makeDf[T](seq: Seq[(Int, Int)], colNames: String*): Dataset[Row] = {
val context = session.sqlContext
import context.implicits._
seq.toDF(colNames: _*)
}
问题在于,上述方法仅采用形状为Seq[(Int, Int)]
的序列作为输入。我如何使它接受任何序列作为输入?我可以将输入形状更改为Seq[AnyRef]
,但代码无法将toDF
调用识别为有效符号。
我不知道该怎么做。有什么想法吗?谢谢
简短回答:
import scala.reflect.runtime.universe.TypeTag
def makeDf[T <: Product: TypeTag](seq: Seq[T], colNames: String*): DataFrame = ...
说明:
当您调用seq.toDF时,实际上使用的是SQLImplicits:中定义的隐式
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}
这又需要产生编码器。问题是编码器只在某些类型上定义。特别是Product(即元组、case类等)您还需要添加TypeTag隐式,这样Scala就可以克服类型擦除(在运行时,无论泛型类型如何,所有序列都有类型序列。TypeTag提供了这方面的信息)。
作为侧节点,您不需要从会话中提取sqlcontext,只需使用:
import sparkSession.implicits._
正如@AssafMendelson已经解释的那样,您不能创建Any
的Dataset
的真正原因是Spark需要Encoder
来将对象从它们的JVM表示转换为其内部表达,并且Spark不能保证为Any
生成这样的Encoder
类型
Assaf的答案是正确的,并且会起作用
然而,IMHO,它的限制性太强了,因为它只适用于Products
(元组和事例类)-即使这包括大多数用例,也有少数用例被排除在外。
因为,您真正需要的是Encoder
,所以您可以将责任留给客户。在大多数情况下,只需要调用import spark.implicits._
就可以将它们放入作用域
因此,我认为这将是最普遍的解决方案。
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}
// Implicit SparkSession to make the call to further methods more transparent.
implicit val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
def makeDf[T: Encoder](seq: Seq[T], colNames: String*)
(implicit spark: SparkSession): DataFrame =
spark.createDataset(seq).toDF(colNames: _*)
def makeDS[T: Encoder](seq: Seq[T])
(implicit spark: SparkSession): Dataset[T] =
spark.createDataset(seq)
注意:这基本上是在重新发明Spark中已经定义的函数