有一些在foreachRDD()
中使用SQL over Spark Streaming的示例。但是如果我想在tranform()
:中使用SQL
case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
if (rdd.count > 0) {
val t = sqc.jsonRDD(rdd)
t.registerTempTable("logstash")
val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
} else {
rdd
}
}).print()
我得到这样的错误:
[error]/Users/raochelin/Downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/logstash.scala:52:方法转换没有类型参数:(transformFunc:org.apache.spark.rdd.rdd[String]=>org.apache.sspark.rdd.rdd[U])[String]=>org.apache.spark.rdd.rdd[_>:LogStash.AlertMsg with String<:java.io.Serializable])[error]-因为---[error]参数表达式的类型与形式参数类型不兼容;[error]找到:org.apache.spark.rdd.rdd[String]=>org.apache.sspark.rdd.rdd[_>:LogStash.AlertMsg with String<:java.io.Serializable][error]必需:org.apache.spark.rdd.rdd[String]=>org.apache.sspark.rdd.rdd[?U][error]行.transform(rdd=>{[错误]^[error]发现一个错误[error](编译:编译)编译失败
似乎只有我使用sqlreport.map(r => r.toString)
才能正确使用?
dstream.transform
采用函数transformFunc: (RDD[T]) ⇒ RDD[U]
在这种情况下,if
必须在条件的两次评估中产生相同的类型,但事实并非如此:
if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]
在这种情况下,删除if rdd.count ...
的优化,这样您就有了唯一的转换路径。