带有通用Dataset[T]参数的Scala Spark函数,同时返回Dataset[T]



我知道Spark要想将数据帧转换为某个类的数据集[T],就需要一个编码器。然而,我通常可以用编码器的主方法进行处理,并调用.as[MyClass],如下所示:

val df = spark.read.parquet("something")
val myDS = df.as[MyClass]

只要有为MyClass定义的编码器,就可以工作

我想创建一个类似的函数

def hello[T](inputDataSet: Dataset[T])(implicit spark: SparkSession): Dataset[T] = {
val replacedDataFrame = inputDataSet
// do some transformation as Dataframe
.as[T]
replacedDataFrame
}

其中我还返回CCD_ 3。然而,当我尝试投射数据帧CCD_ 4时;找不到隐含词";。我只是想,既然它能够理解我通过Dataset[T]时在做什么,它就应该能够理解相反的情况,但我想不是。有办法绕过这个吗?

示例用例:

// function to replace a column with values from another DataSet
def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K])(implicit spark: SparkSession): Dataset[T] = {
val replacedDataFrame = inputDS
.join(broadcast(joinable), "col1") // exists in "joinableDS" and "inputDS"
.withColumnRenamed("col1", "to-drop")
.withColumnRenamed("col2", "col1") // "col2" exists only in "joinableDS"
.drop("to-drop") 
.as[T]
replacedDataFrame
}

注意,这不是我唯一的用例。但这里的问题是,我传入一个Dataset[T],在对它进行一些操作后,我也想将返回指定为Dataset[T]。一旦我做了join,它就会将Dataset转换为Dataframe,并且它不知道什么类被定义为T

试试这个,我很难解释,但它解决了你得到的错误消息:

import org.apache.spark.sql.functions._
import org.apache.spark.sql._
import spark.implicits._
import org.apache.spark.sql.Encoders
case class T(name: String, age: Long)
case class K(name: String, age2: Long)
val dt = Seq(T("Andy", 32), T("John", 33), T("Bob", 33)).toDS()
dt.show()
val dk = Seq(K("Andy", 32), K("John", 133), K("Bob", 245)).toDS()
dk.show()
implicit val sqlContext: SparkSession = spark
def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K])(implicit spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
//def swapColumnValue[T,K](inputDS: Dataset[T], joinableDS: Dataset[K]) : DataFrame = {
val replacedDataFrame = inputDS
.join(broadcast(joinableDS), "name")  
.withColumnRenamed("age", "to-drop")
.withColumnRenamed("age2", "age")  
.drop("to-drop") 
.as[T]

replacedDataFrame
}
val ds = swapColumnValue(dt,dk) 
ds.show(false)

退货:

+----+---+
|name|age|
+----+---+
|Andy| 32|
|John| 33|
| Bob| 33|
+----+---+
+----+----+
|name|age2|
+----+----+
|Andy|  32|
|John| 133|
| Bob| 245|
+----+----+
+----+---+
|name|age|
+----+---+
|Andy|32 |
|John|133|
|Bob |245|
+----+---+

ds是T型数据集。

最新更新