我正在尝试在kafka-spark流中创建一个数据帧,我将成功映射到案例类的值,但是每当我调用todf方法时,它都会给我错误。 **
值TODF不是数组[Weatherevent] [错误]的成员 原因:也许在"值todf"之前丢失了半分号?[错误]
})。todf("经度","纬度","国家","日出","日落" "温度","温度","温度",[误差] ^ [错误]找到一个错误[错误](编译:编译) 汇编失败[错误]总时间:2 s,2017年9月27日完成 11:49:23 AM
这是我的代码
val inputStream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, String ](Array("test"), kafkaParams))
// val json = parse(inputStream)
val processedStream = inputStream
.flatMap(record => record.value.split(" ").map(payload => {
//val ts = Timestamp.valueOf(payload(3))
WeatherEvent(payload(0).toDouble, payload(1).toDouble, payload(2).toString , payload(3).toInt,
payload(4).toInt, payload(5).toDouble, payload(6).toDouble, payload(7).toDouble,
payload(8).toDouble, payload(9).toInt, payload(10).toInt, payload(11).toInt,
payload(12).toDouble, payload(13).toDouble)
}).toDF("longitude", "latitude", "country", "sunrise", "sunset", "temperature", "temperatureMin", "temperatureMax",
"pressure", "humidity", "cloudiness", "id", "wind_speed", "wind_deg")
)
谢谢**
todf()是SQLContext中定义的隐式方法。TODF()用于将RDD转换为DataFrame。在这里,您是从Kafka获得的,我的意思是Dstreams。要将其转换为DF,您需要使用 transform api或 foreachrdd api来处理Dstream中的每个RDD。下面我使用foreachrdd转换将RDD转换为DataFrame
val data=KafkaUtils.createStream(ssc, zkQuorum, "GroupName", topics).map(x=>x._2)
val lines12=data.foreachRDD(x=>{
val df=x.flatMap(x => x.split(",")).map(x=>(x(0),x(1))).toDF()
}