如何将SPARK SQL中的数据类型转换为特定数据类型,但RDD结果转换为特定类



>我正在读取一个csv文件,需要创建一个RDDSchema
我通过使用 sqlContext.csvFile 读取该文件

val testfile = sqlContext.csvFile("file")
testfile.registerTempTable(testtable)

我想更改选择一些字段并返回这些字段的 RDD 类型例如:类测试(ID:字符串,order_date:日期,名称:字符串,值:双精度)

使用 sqlContext.sql("Select col1, col2, col3, col4 from ...)

 val testfile = sqlContext.sql("Select col1, col2, col3, col4 FROM testtable).collect
testfile.getClass
Class[_ <: Array[org.apache.spark.sql.Row]] = class [Lorg.apache.spark.sql.Row;

所以我想将 col1 更改为双精度,将 col2 更改为日期,将列 3 更改为字符串?有没有办法在sqlContext中做到这一点.sql或者我必须对结果运行一个映射函数,然后将其转回RDD。我试图在一个语句中执行该项目,但出现此错误:

 val old_rdd : RDD[Test] = sqlContext.sql("SELECT col, col2, col3,col4  FROM testtable").collect.map(t => (t(0) : String ,dateFormat.parse(dateFormat.format(1)),t(2) : String, t(3) : Double))

我遇到的问题是赋值不会在RDD[测试]上产生,其中测试是一个定义的类

错误是说map命令是作为数组类而不是RDD类出现的

 found   : Array[edu.model.Test]
 [error]  required: org.apache.spark.rdd.RDD[edu.model.Test]

假设你有一个这样的案例类:

case class Test(
  ID: String, order_date: java.sql.Date, Name: String, value: Double)

由于您使用默认参数csvFile加载数据,因此它不会执行任何架构推理,并且您的数据存储为纯字符串。假设没有其他字段:

val df = sc.parallelize(
  ("ORD1", "2016-01-02", "foo", "2.23") ::
  ("ORD2", "2016-07-03", "bar", "9.99") :: Nil
).toDF("col1", "col2", "col3", "col4")

您尝试使用地图是错误的,原因不止一个:

  • 您使用的函数使用不正确的类型批注各个值。不仅Row.apply属于Int => Any类型,而且您的数据表包含不应包含任何Double
  • 由于您collect(这在这里没有意义),您将所有数据提取给驱动程序,结果是本地的Array而不是RDD
  • 最后,如果之前的所有问题都解决了,(String, Date, String, Double)显然不是一个Test

处理此问题的一种方法:

import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val casted = df.select(
  $"col1".alias("ID"),
  $"col2".cast("date").alias("order_date"),
  $"col3".alias("name"),
  $"col4".cast("double").alias("value")
)
val tests: RDD[Test] = casted.map {
  case Row(id: String, date: java.sql.Date, name: String, value: Double) =>
    Test(id, date, name, value)
}

您也可以尝试使用新的Dataset API,但它远非稳定:

casted.as[Test].rdd

最新更新