我正在执行以下代码以从文本文件创建数据框。
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{StructType, StringType, StructField}
/**
* Created by PSwain on 6/19/2016.
*/
object RddToDataframe extends App {
val scnf=new SparkConf().setAppName("RddToDataFrame").setMaster("local[1]")
val sc = new SparkContext(scnf)
val sqlContext = new SQLContext(sc)
val employeeRdd=sc.textFile("C:\Users\pswain\IdeaProjects\test1\src\main\resources\employee")
//Creating schema
val employeeSchemaString="id name age"
val schema = StructType(employeeSchemaString.split(",").map( colNmae => StructField(colNmae,StringType,true)))
//Creating RowRdd
val rowRdd= employeeRdd.map(row => row.split(",")).map(row => Row(row(0).trim.toInt,row(1),row(2).trim.toInt))
//Creating dataframe = RDD[rowRdd] + schema
val employeeDF=sqlContext.createDataFrame(rowRdd,schema). registerTempTable("Employee")
sqlContext.sql("select * from Employee").show()
}
但是在Intelij执行时,我发现以下类型不匹配错误。无法确定为什么此错误会召集我只是将字符串转换为整数。员工文件的输入低于输入,他们全部显示一行,但它们是一行。
1201,Satish,251202,克里希纳,281203,Amith,391204,Javed,231205,Prudvi,23
16/06/19 15:18:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
scala.MatchError: 1201 (of class java.lang.Integer)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:295)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:294)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)
架构是使用所有列类型创建的,将其定义为StringType。
val schema = StructType(employeeSchemaString.split(",").map( colNmae => StructField(colNmae,StringType,true)))
但是RowRDD具有int类型,字符串和int。
的列这是工作代码
val structType= {
val id = StructField("id", IntegerType)
val name = StructField("name", StringType)
val age = StructField("age", IntegerType)
new StructType(Array(id, name , age))
}
val rowRdd= employeeRdd.map(row => row.split(",")).map(row => Row(row(0).trim().toInt,row(1),row(2).trim().toInt))
sqlContext.createDataFrame(rowRdd,structType). registerTempTable("Employee")
sqlContext.sql("select * from Employee").show()