我遵循Spark KafkaWordCount。scala的例子来处理Kafka流。为了获得编写计算逻辑的简单方法,我还使用了Spark-SQL。
问题是我发现每个SQL查询花费300 - 400毫秒,即使是空流!当我有2秒的时间窗口时,这个代价太大了。同样的逻辑由Scala代码编写只花费了10 - 12ms 。
Spark-SQL版本:
def processBySQL(persons: RDD[Person], sqc: SQLContext) = {
val ts = System.currentTimeMillis
import sqc.implicits._
val df = persons.toDF()
df.registerTempTable("tb_person")
sqc.cacheTable("tb_person")
sqc.sql("SELECT count(1), age FROM tb_person GROUP BY age").collect().foreach(println)
sqc.uncacheTable("tb_person")
println("[SQL]: " + (System.currentTimeMillis - ts) + " ms") //300 - 400ms
}
Scala版本:
def processByCode(persons: RDD[Person]) = {
val ts = System.currentTimeMillis
persons.groupBy(_.age)
.map(group => {
val (age, items) = group
val size = items.size
(items.last.name, items.size)
}).collect().foreach(println)
println("[CODE]: " + (System.currentTimeMillis - ts) + " ms") // 10 - 12ms
}
完整的测试代码在这里:https://gist.github.com/nonlyli/e247a576b275cd7b3d88
对这个问题有什么想法吗?
更新2015.09.21:升级到spark 1.5.0,测试结果没有太大差异
sql和非sql不做同样的事情。例如,在SQL版本中,你调用cacheTable