如何将数组[String]转换为正确的模式



在尝试将字段从RDD[Array[String]]日常转换为模式中指定的正确值以转换为Spark SQL DataFrame时,我遇到了一个奇怪的问题。

我有一个RDD[Array[String]]和一个名为schemaStructType,它们指定了服务器字段的类型。到目前为止,我所做的是:

sqlContext.createDataFrame(
    inputLines.map( rowValues => 
                          RowFactory.create(rowValues.zip(schema.toSeq)
                                                     .map{ case (value, struct) => 
                                                  struct.dataType match {
                                                    case BinaryType => value.toCharArray().map(ch => ch.toByte)
                                                    case ByteType => value.toByte
                                                    case BooleanType => value.toBoolean
                                                    case DoubleType => value.toDouble
                                                    case FloatType => value.toFloat
                                                    case ShortType => value.toShort
                                                    case DateType => value
                                                    case IntegerType => value.toInt
                                                    case LongType => value.toLong
                                                    case _ => value
                                                  }
                                               })), schema)

但我得到了这个例外:

java.lang.RuntimeException: Failed to convert value [Ljava.lang.Object;@6e9ffad1 (class of class [Ljava.lang.Object;}) with the type of IntegerType to JSON

当调用toJSON方法时。。。

你知道发生这种情况的原因吗?我该怎么办?

正如所问,这里有一个例子:

val schema = StructType(Seq(StructField("id",IntegerType),StructField("val",StringType)))
val inputLines=sc.parallelize(
      Array("1","This is a line for testing"), 
      Array("2","The second line"))

您将Array作为唯一参数传递给RowFactory.create

如果你看到它的方法签名:

public static Row create(Object ... values) 

它期望CCD_ 9列表。

因此,您只需要使用:_*语法将数组转换为varargs列表。

sqlContext.createDataFrame(inputLines.map( rowValues => 
   Row(              // RowFactory.create is java api, use Row.apply instead
      rowValues.zip(schema.toSeq)
                .map{ case (value, struct) => struct.dataType match {
                   case BinaryType => value.toCharArray().map(ch => ch.toByte)
                   case ByteType => value.toByte
                   case BooleanType => value.toBoolean
                   case DoubleType => value.toDouble
                   case FloatType => value.toFloat
                   case ShortType => value.toShort
                   case DateType => value
                   case IntegerType => value.toInt
                   case LongType => value.toLong
                   case _ => value
                   }
                 } : _*            // <-- make varargs here
   )),
   schema)

在上面的代码中,我用Row.apply替换了RowFactory.create,并将参数作为varargs传递。

或者,使用Row.fromSeq方法。

重构一点:

def convertTypes(value: String, struct: StructField): Any = struct.dataType match {
  case BinaryType => value.toCharArray().map(ch => ch.toByte)
  case ByteType => value.toByte
  case BooleanType => value.toBoolean
  case DoubleType => value.toDouble
  case FloatType => value.toFloat
  case ShortType => value.toShort
  case DateType => value
  case IntegerType => value.toInt
  case LongType => value.toLong
  case _ => value
}
val schema = StructType(Seq(StructField("id",IntegerType),
                            StructField("val",StringType)))
val inputLines = sc.parallelize(Array(Array("1","This is a line for testing"), 
                                      Array("2","The second line")))
val rowRdd = inputLines.map{ array => 
  Row.fromSeq(array.zip(schema.toSeq)
                   .map{ case (value, struct) => 
                           convertTypes(value, struct) })
}
val df = sqlContext.createDataFrame(rowRdd, schema)
df.toJSON.collect 
// Array({"id":1,"val":"This is a line for testing"},
//       {"id":2,"val":"The second line"})

相关内容

  • 没有找到相关文章

最新更新