我想执行一个键入的转换以替换数据集上某些列的所有值。我知道使用"选择"是可能的,但是我希望将完整的数据集返回,其中包含特定列值而不是单个列。我也知道使用column方法可以直接且直接,但这被认为是一种不体的转换。要对键入转换做同样的事情并恢复完整的数据集,我正在使用mappartitions,但会遇到问题:
case class Listing(street: String, zip: Int, price: Int)
val list = List(Listing("Main St", 92323, 30000), Listing("1st St", 94331, 10000),Listing("Sunset Ave", 98283, 50000))
val ds = sc.parallelize(list).toDS
val colNames = ds.columns
val newDS = ds.mapPartitions{ iter => val newDSIter =
for (row <- iter) yield {
val newRow = for (i <- 0 until ds.columns.length) yield {
if (some_condition) {
//using reflection to get field value since the column to be
//processed will be dynamically known based on if condition
val value = row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
//send 'value' to some function for processing and returning new value
} else {
//just return field value
row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
}
} newRow
}
newDSIter
}
这给我以下错误:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
我更改了以下行: newrow.as [listing]
显示错误
error: value as is not a member of scala.collection.immutable.IndexedSeq[String]
告诉我,一个人对象没有被返回,而只是一组字符串。
执行键入转换后,这是返回完整数据集的正确方法,并且由于我将字符串集合而不是一个人对象而丢失了吗?
我的另一个问题是我对打字和未型转换的困惑。如果严格针对数据框架定义了模式,并且对其进行了一些转换,为什么它仍然认为它仍然被认为是未型的转换?或者,如果在数据集中调用了withColumn方法(而不是数据框架),并且将返回的值转换为数据集,则仍被视为未型转换?
val newDS = ds.withColumn("zip", some_func).as[Listing]
返回数据集。
编辑:
更新了行返回行(newRow)如下:
Listing.getClass.getMethods.find(x => x.getName == "apply" && x.isBridge).get
.invoke(Listing, newRow map (_.asInstanceOf[AnyRef]): _*).asInstanceOf[Listing]
在Spark-Shell中,这是我需要返回数据集[列表],但是在使用SBT编译代码时,获取错误:
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
我能够首先通过将集合转换为案例类(请参阅编辑)并确保导入Spark.implicits_。