Spark RDD到Dataframe的模式指定



当从RDD转换为Row对象时,spark似乎不能为DataFrame应用模式(不同于字符串)。我在Spark 1.4和1.5版本上都试过。

代码片段(Java API):
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(jssc, String.class, String.class,
                StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
directKafkaStream.foreachRDD(rdd -> {
    rdd.foreach(x -> System.out.println("x._1() = " + x._1()));
    rdd.foreach(x -> System.out.println("x._2() = " + x._2()));
    JavaRDD<Row> rowRdd = rdd.map(x -> RowFactory.create(x._2().split("t")));
    rowRdd.foreach(x -> System.out.println("x = " + x));
    SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
    StructField id = DataTypes.createStructField("id", DataTypes.IntegerType, true);
    StructField name = DataTypes.createStructField("name", DataTypes.StringType, true);
    List<StructField> fields = Arrays.asList(id, name);
    StructType schema = DataTypes.createStructType(fields);
    DataFrame sampleDf = sqlContext.createDataFrame(rowRdd, schema);
    sampleDf.printSchema();
    sampleDf.show();
    return null;
});
jssc.start();
jssc.awaitTermination();

如果指定数据类型,将产生以下输出。id字段的StringType:

x._1() = null
x._2() = 1  item1
x = [1,item1]
root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
+---+-----+
| id| name|
+---+-----+
|  1|item1|
+---+-----+
对于指定的代码,它抛出错误:
x._1() = null
x._2() = 1  item1
x = [1,item1]
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
15/09/16 04:13:33 ERROR JobScheduler: Error running job streaming job 1442402013000 ms.0
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Integer
    at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getInt(rows.scala:40)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getInt(rows.scala:220)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$IntConverter$.toScalaImpl(CatalystTypeConverters.scala:358)

Spark Confluence也出现过类似的问题,但它被标记为1.3版本已解决。

您混合了两种不同的东西—数据类型和DataFrame模式。当您像这样创建Row时:

RowFactory.create(x._2().split("t"))

你得到Row(_: String, _: String),但你的图式显示你有Row(_: Integer, _: String)。因为没有自动的类型转换,所以你会得到这个错误。

要使其工作,您可以在创建行时转换值,或将id定义为StringType并在创建DataFrame后使用Column.cast方法。

相关内容

  • 没有找到相关文章

最新更新