使用映射的数据集的键入转换



我想执行一个键入的转换以替换数据集上某些列的所有值。我知道使用"选择"是可能的,但是我希望将完整的数据集返回,其中包含特定列值而不是单个列。我也知道使用column方法可以直接且直接,但这被认为是一种不体的转换。要对键入转换做同样的事情并恢复完整的数据集,我正在使用mappartitions,但会遇到问题:

case class Listing(street: String, zip: Int, price: Int)
val list = List(Listing("Main St", 92323, 30000), Listing("1st St", 94331, 10000),Listing("Sunset Ave", 98283, 50000))
val ds = sc.parallelize(list).toDS
val colNames = ds.columns
val newDS = ds.mapPartitions{ iter => val newDSIter = 
    for (row <- iter) yield {
      val newRow = for (i <- 0 until ds.columns.length) yield {
        if (some_condition) { 
          //using reflection to get field value since the column to be
          //processed will be dynamically known based on if condition 
          val value = row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
          //send 'value' to some function for processing and returning new value
        } else { 
          //just return field value
          row.getClass.getDeclaredMethod(colNames(i)).invoke(row).toString
        }
      } newRow
    } 
 newDSIter
}

这给我以下错误:

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

我更改了以下行: newrow.as [listing]

显示错误

error: value as is not a member of scala.collection.immutable.IndexedSeq[String]

告诉我,一个人对象没有被返回,而只是一组字符串。

执行键入转换后,这是返回完整数据集的正确方法,并且由于我将字符串集合而不是一个人对象而丢失了吗?

我的另一个问题是我对打字和未型转换的困惑。如果严格针对数据框架定义了模式,并且对其进行了一些转换,为什么它仍然认为它仍然被认为是未型的转换?或者,如果在数据集中调用了withColumn方法(而不是数据框架),并且将返回的值转换为数据集,则仍被视为未型转换?

val newDS = ds.withColumn("zip", some_func).as[Listing]

返回数据集。

编辑:

更新了行返回行(newRow)如下:

Listing.getClass.getMethods.find(x => x.getName == "apply" && x.isBridge).get
.invoke(Listing, newRow map (_.asInstanceOf[AnyRef]): _*).asInstanceOf[Listing]

在Spark-Shell中,这是我需要返回数据集[列表],但是在使用SBT编译代码时,获取错误:

error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.

我能够首先通过将集合转换为案例类(请参阅编辑)并确保导入Spark.implicits_。

来解决此问题。

相关内容

  • 没有找到相关文章

最新更新