这是Spark 1.3中的回归错误吗



spark SQL 1.2.1中没有弃用警告,以下代码在1.3 中停止工作

在1.2.1中工作(没有任何弃用警告)

 val sqlContext = new HiveContext(sc)
 import sqlContext._
 val jsonRDD = sqlContext.jsonFile(jsonFilePath)
 jsonRDD.registerTempTable("jsonTable")
 val jsonResult = sql(s"select * from jsonTable")
 val foo = jsonResult.zipWithUniqueId().map {
   case (Row(...), uniqueId) => // do something useful
   ...
 }
 foo.registerTempTable("...")

在1.3.0中停止工作(只是不编译,我所做的只是更改为1.3)

jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

不工作的解决方法:

尽管这可能会给我一个RDD[行]:

jsonResult.rdd.zipWithUniqueId()  

现在这不起作用,因为RDD[Row]没有registerTempTable方法,当然

     foo.registerTempTable("...")

以下是我的问题

  1. 有变通办法吗?(例如,我只是做错了吗?)
  2. 这是个虫子吗?(我认为,在没有@deprecated警告的情况下,任何停止编译的操作都是回归错误)

这不是一个bug,但很抱歉造成混淆!在Spark 1.3之前,由于API仍在不断变化,Spark SQL被标记为Alpha组件。通过Spark 1.3,我们毕业并稳定了API。在文档中可以找到移植时需要做什么的完整描述。

我也可以回答您的具体问题,并就我们为什么进行这些更改提供一些理由

在1.3.0中停止工作(只是不编译,我所做的只是更改为1.3)jsonResult.zipWithUniqueId() //since RDDApi doesn't implement that method

DataFrames现在是Scala和Java的统一接口。但是,由于我们必须在1.X的其余部分保持与现有RDD API的兼容性,因此DataFrames不是RDD s。要获得RDD表示,您可以调用df.rdddf.javaRDD

此外,因为我们担心隐式转换可能会出现一些混乱,所以我们要求您必须显式调用rdd.toDF才能从RDD进行转换。但是,只有当RDD包含从Product继承的对象(即元组或事例类)时,此转换才会自动工作。

回到最初的问题,如果你想对具有任意模式的行进行do to转换,你需要在映射操作后明确地告诉Spark SQL数据的结构(因为编译器不能)。

import org.apache.spark.sql.types._
val jsonData = sqlContext.jsonRDD(sc.parallelize("""{"name": "Michael", "zip": 94709}""" :: Nil))
val newSchema = 
  StructType(
    StructField("uniqueId", IntegerType) +: jsonData.schema.fields)
val augmentedRows = jsonData.rdd.zipWithUniqueId.map { 
  case (row, id) =>
    Row.fromSeq(id +: row.toSeq)
}
val newDF = sqlContext.createDataFrame(augmentedRows, newSchema)

相关内容

  • 没有找到相关文章

最新更新