在尝试将字段从RDD[Array[String]]
日常转换为模式中指定的正确值以转换为Spark SQL DataFrame
时,我遇到了一个奇怪的问题。
我有一个RDD[Array[String]]
和一个名为schema
的StructType
,它们指定了服务器字段的类型。到目前为止,我所做的是:
sqlContext.createDataFrame(
inputLines.map( rowValues =>
RowFactory.create(rowValues.zip(schema.toSeq)
.map{ case (value, struct) =>
struct.dataType match {
case BinaryType => value.toCharArray().map(ch => ch.toByte)
case ByteType => value.toByte
case BooleanType => value.toBoolean
case DoubleType => value.toDouble
case FloatType => value.toFloat
case ShortType => value.toShort
case DateType => value
case IntegerType => value.toInt
case LongType => value.toLong
case _ => value
}
})), schema)
但我得到了这个例外:
java.lang.RuntimeException: Failed to convert value [Ljava.lang.Object;@6e9ffad1 (class of class [Ljava.lang.Object;}) with the type of IntegerType to JSON
当调用toJSON
方法时。。。
你知道发生这种情况的原因吗?我该怎么办?
正如所问,这里有一个例子:
val schema = StructType(Seq(StructField("id",IntegerType),StructField("val",StringType)))
val inputLines=sc.parallelize(
Array("1","This is a line for testing"),
Array("2","The second line"))
您将Array
作为唯一参数传递给RowFactory.create
。
如果你看到它的方法签名:
public static Row create(Object ... values)
它期望CCD_ 9列表。
因此,您只需要使用:_*
语法将数组转换为varargs列表。
sqlContext.createDataFrame(inputLines.map( rowValues =>
Row( // RowFactory.create is java api, use Row.apply instead
rowValues.zip(schema.toSeq)
.map{ case (value, struct) => struct.dataType match {
case BinaryType => value.toCharArray().map(ch => ch.toByte)
case ByteType => value.toByte
case BooleanType => value.toBoolean
case DoubleType => value.toDouble
case FloatType => value.toFloat
case ShortType => value.toShort
case DateType => value
case IntegerType => value.toInt
case LongType => value.toLong
case _ => value
}
} : _* // <-- make varargs here
)),
schema)
在上面的代码中,我用Row.apply替换了RowFactory.create,并将参数作为varargs传递。
或者,使用Row.fromSeq
方法。
重构一点:
def convertTypes(value: String, struct: StructField): Any = struct.dataType match {
case BinaryType => value.toCharArray().map(ch => ch.toByte)
case ByteType => value.toByte
case BooleanType => value.toBoolean
case DoubleType => value.toDouble
case FloatType => value.toFloat
case ShortType => value.toShort
case DateType => value
case IntegerType => value.toInt
case LongType => value.toLong
case _ => value
}
val schema = StructType(Seq(StructField("id",IntegerType),
StructField("val",StringType)))
val inputLines = sc.parallelize(Array(Array("1","This is a line for testing"),
Array("2","The second line")))
val rowRdd = inputLines.map{ array =>
Row.fromSeq(array.zip(schema.toSeq)
.map{ case (value, struct) =>
convertTypes(value, struct) })
}
val df = sqlContext.createDataFrame(rowRdd, schema)
df.toJSON.collect
// Array({"id":1,"val":"This is a line for testing"},
// {"id":2,"val":"The second line"})