我创建了一个带有以下代码
的模式val schema= new StructType().add("city", StringType, true).add("female", IntegerType, true).add("male", IntegerType, true)
从
创建一个RDDval data = spark.sparkContext.textFile("cities.txt")
转换为行的RDD以应用模式
val cities = data.map(line => line.split(";")).map(row => Row.fromSeq(row.zip(schema.toSeq)))
val citiesRDD = spark.sqlContext.createDataFrame(cities, schema)
这给了我一个错误
java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: scala.Tuple2 is not a valid external type for schema of string
您不需要架构来创建Row
,当创建DataFrame
时,您需要架构。您还需要介绍一些逻辑如何将拆分线(生成3个字符串)转换为整数:
在这里是最小的解决方案,没有例外:
val data = sc.parallelize(Seq("Bern;10;12")) // mock for real data
val schema = new StructType().add("city", StringType, true).add("female", IntegerType, true).add("male", IntegerType, true)
val cities = data.map(line => {
val Array(city,female,male) = line.split(";")
Row(
city,
female.toInt,
male.toInt
)
}
)
val citiesDF = sqlContext.createDataFrame(cities, schema)
我通常使用案例类来创建数据框,因为Spark可以从案例类中推断出架构:
// "schema" for dataframe, define outside of main method
case class MyRow(city:Option[String],female:Option[Int],male:Option[Int])
val data = sc.parallelize(Seq("Bern;10;12")) // mock for real data
import sqlContext.implicits._
val citiesDF = data.map(line => {
val Array(city,female,male) = line.split(";")
MyRow(
Some(city),
Some(female.toInt),
Some(male.toInt)
)
}
).toDF()