>我正在读取一个csv文件,需要创建一个RDDSchema
我通过使用 sqlContext.csvFile 读取该文件
val testfile = sqlContext.csvFile("file")
testfile.registerTempTable(testtable)
我想更改选择一些字段并返回这些字段的 RDD 类型例如:类测试(ID:字符串,order_date:日期,名称:字符串,值:双精度)
使用 sqlContext.sql("Select col1, col2, col3, col4 from ...)
val testfile = sqlContext.sql("Select col1, col2, col3, col4 FROM testtable).collect
testfile.getClass
Class[_ <: Array[org.apache.spark.sql.Row]] = class [Lorg.apache.spark.sql.Row;
所以我想将 col1 更改为双精度,将 col2 更改为日期,将列 3 更改为字符串?有没有办法在sqlContext中做到这一点.sql或者我必须对结果运行一个映射函数,然后将其转回RDD。我试图在一个语句中执行该项目,但出现此错误:
val old_rdd : RDD[Test] = sqlContext.sql("SELECT col, col2, col3,col4 FROM testtable").collect.map(t => (t(0) : String ,dateFormat.parse(dateFormat.format(1)),t(2) : String, t(3) : Double))
我遇到的问题是赋值不会在RDD[测试]上产生,其中测试是一个定义的类
错误是说map命令是作为数组类而不是RDD类出现的
found : Array[edu.model.Test]
[error] required: org.apache.spark.rdd.RDD[edu.model.Test]
假设你有一个这样的案例类:
case class Test(
ID: String, order_date: java.sql.Date, Name: String, value: Double)
由于您使用默认参数csvFile
加载数据,因此它不会执行任何架构推理,并且您的数据存储为纯字符串。假设没有其他字段:
val df = sc.parallelize(
("ORD1", "2016-01-02", "foo", "2.23") ::
("ORD2", "2016-07-03", "bar", "9.99") :: Nil
).toDF("col1", "col2", "col3", "col4")
您尝试使用地图是错误的,原因不止一个:
- 您使用的函数使用不正确的类型批注各个值。不仅
Row.apply
属于Int => Any
类型,而且您的数据表包含不应包含任何Double
值 - 由于您
collect
(这在这里没有意义),您将所有数据提取给驱动程序,结果是本地的Array
而不是RDD
- 最后,如果之前的所有问题都解决了,
(String, Date, String, Double)
显然不是一个Test
处理此问题的一种方法:
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val casted = df.select(
$"col1".alias("ID"),
$"col2".cast("date").alias("order_date"),
$"col3".alias("name"),
$"col4".cast("double").alias("value")
)
val tests: RDD[Test] = casted.map {
case Row(id: String, date: java.sql.Date, name: String, value: Double) =>
Test(id, date, name, value)
}
您也可以尝试使用新的Dataset
API,但它远非稳定:
casted.as[Test].rdd