无法使用case类从行的RDD创建数据框



使用Spark 2。x,似乎我不能使用由case类组成的行RDD创建数据框架。

它在Spark 1.6上工作得很好。X,但在2上失败。运行时异常:

java.lang.RuntimeException: Timestamp is not a valid external type for schema of struct<seconds:bigint,nanos:int>

前面是一堆由Catalyst生成的代码。

下面是代码片段(我正在做的事情的简化版本):

package main
import org.apache.spark.sql.{SparkSession, Row}
import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
object Test {
  case class Timestamp(seconds: Long, nanos: Int)
  val TIMESTAMP_TYPE = StructType(List(
    StructField("seconds", LongType, false),
    StructField("nanos", IntegerType, false)
  ))
  val SCHEMA = StructType(List(
    StructField("created_at", TIMESTAMP_TYPE, true)
  ))
  def main(args: Array[String]) {
    val spark = SparkSession.builder().getOrCreate()
    val rowRDD = spark.sparkContext.parallelize(Seq((0L, 0))).map {
      case (seconds: Long, nanos: Int) => {
        Row(Timestamp(seconds, nanos))
      }
    }
    spark.createDataFrame(rowRDD, SCHEMA).show(1)
  }
}

我不确定这是Spark的错误还是我在文档中错过的东西(我知道Spark 2。x引入了运行时行编码验证,也许这是相关的)

帮助非常感谢

我不确定这是否是一个bug,但混合动态类型的Row, case类和显式模式没有多大意义。使用Rows和模式:

import collection.mutable._
import collection.JavaConverters._
spark.createDataFrame(ArrayBuffer(Row(Row(0L, 0))).asJava, SCHEMA)

或case类:

import spark.implicits._
Seq(Tuple1(Timestamp(0L, 0))).toDF("created_at")

否则你只是在做同样的工作两次。

注意:

如果你想表达字段可以为空,你可以使用Options。例如

case class Record(created_at: Option[Timestamp])
case class Timestamp(seconds: Long, nanos: Option[Int])
Seq(Record(Some(Timestamp(0L, Some(0))))).toDF

将生成模式,其中created_atcreated_at.milliseconds可以为NULL,但如果created_at不是NULL,则必须设置created_at.seconds

相关内容

  • 没有找到相关文章

最新更新