在DStream.transform()中使用SQL而不是Spark Streaming



有一些在foreachRDD()中使用SQL over Spark Streaming的示例。但是如果我想在tranform():中使用SQL

case class AlertMsg(host:String, count:Int, sum:Double)
val lines = ssc.socketTextStream("localhost", 8888)
lines.transform( rdd => {
  if (rdd.count > 0) {
    val t = sqc.jsonRDD(rdd)
    t.registerTempTable("logstash")
    val sqlreport = sqc.sql("SELECT host, COUNT(host) AS host_c, AVG(lineno) AS line_a FROM logstash WHERE path = '/var/log/system.log' AND lineno > 70 GROUP BY host ORDER BY host_c DESC LIMIT 100")
    sqlreport.map(r => AlertMsg(r(0).toString,r(1).toString.toInt,r(2).toString.toDouble))
  } else {
    rdd
  }
}).print()

我得到这样的错误:

[error]/Users/raochelin/Downloads/spark-1.2.0-bin-hadoop2.4/logstash/src/main/scala/logstash.scala:52:方法转换没有类型参数:(transformFunc:org.apache.spark.rdd.rdd[String]=>org.apache.sspark.rdd.rdd[U])[String]=>org.apache.spark.rdd.rdd[_>:LogStash.AlertMsg with String<:java.io.Serializable])[error]-因为---[error]参数表达式的类型与形式参数类型不兼容;[error]找到:org.apache.spark.rdd.rdd[String]=>org.apache.sspark.rdd.rdd[_>:LogStash.AlertMsg with String<:java.io.Serializable][error]必需:org.apache.spark.rdd.rdd[String]=>org.apache.sspark.rdd.rdd[?U][error]行.transform(rdd=>{[错误]^[error]发现一个错误[error](编译:编译)编译失败

似乎只有我使用sqlreport.map(r => r.toString)才能正确使用?

dstream.transform采用函数transformFunc: (RDD[T]) ⇒ RDD[U]在这种情况下,if必须在条件的两次评估中产生相同的类型,但事实并非如此:

if (count == 0) => RDD[String]
if (count > 0) => RDD[AlertMsg]

在这种情况下,删除if rdd.count ...的优化,这样您就有了唯一的转换路径。

相关内容

  • 没有找到相关文章

最新更新