Scala:如何将任何泛型序列作为该方法的输入



Scala noob在这里。还在努力学习语法。

我正在努力减少将测试数据转换为DataFrames所需的代码。以下是我现在拥有的:

def makeDf[T](seq: Seq[(Int, Int)], colNames: String*): Dataset[Row] = {
val context = session.sqlContext
import context.implicits._
seq.toDF(colNames: _*)
}

问题在于,上述方法仅采用形状为Seq[(Int, Int)]的序列作为输入。我如何使它接受任何序列作为输入?我可以将输入形状更改为Seq[AnyRef],但代码无法将toDF调用识别为有效符号。

我不知道该怎么做。有什么想法吗?谢谢

简短回答:

import scala.reflect.runtime.universe.TypeTag
def makeDf[T <: Product: TypeTag](seq: Seq[T], colNames: String*): DataFrame = ...

说明:

当您调用seq.toDF时,实际上使用的是SQLImplicits:中定义的隐式

implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

这又需要产生编码器。问题是编码器只在某些类型上定义。特别是Product(即元组、case类等)您还需要添加TypeTag隐式,这样Scala就可以克服类型擦除(在运行时,无论泛型类型如何,所有序列都有类型序列。TypeTag提供了这方面的信息)。

作为侧节点,您不需要从会话中提取sqlcontext,只需使用:

import sparkSession.implicits._

正如@AssafMendelson已经解释的那样,您不能创建AnyDataset的真正原因是Spark需要Encoder来将对象从它们的JVM表示转换为其内部表达,并且Spark不能保证为Any生成这样的Encoder类型

Assaf的答案是正确的,并且会起作用
然而,IMHO,它的限制性太强了,因为它只适用于Products(元组和事例类)-即使这包括大多数用例,也有少数用例被排除在外。

因为,您真正需要的是Encoder,所以您可以将责任留给客户。在大多数情况下,只需要调用import spark.implicits._就可以将它们放入作用域
因此,我认为这将是最普遍的解决方案。

import org.apache.spark.sql.{DataFrame, Dataset, Encoder, SparkSession}
// Implicit SparkSession to make the call to further methods more transparent.
implicit val spark = SparkSession.builder.master("local[*]").getOrCreate()
import spark.implicits._
def makeDf[T: Encoder](seq: Seq[T], colNames: String*)
(implicit spark: SparkSession): DataFrame =
spark.createDataset(seq).toDF(colNames: _*)
def makeDS[T: Encoder](seq: Seq[T])
(implicit spark: SparkSession): Dataset[T] =
spark.createDataset(seq)

注意:这基本上是在重新发明Spark中已经定义的函数

相关内容

  • 没有找到相关文章

最新更新