如何在齐柏林飞来中使用Spark SQL查询Spark StreamingContext



我正在尝试使用Spark SQL使用Zeppelin来查询来自Kafka的数据,以进行实时趋势分析,但没有成功。

这是我在Zeppelin中运行的简单代码片段

//Load Dependency
%dep 
    z.reset()
    z.addRepo("Spark Packages Repo").url("http://repo1.maven.org/maven2/")
    z.load("org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.1")
    z.load("org.apache.spark:spark-core_2.11:2.0.1")
    z.load("org.apache.spark:spark-sql_2.11:2.0.1")
    z.load("org.apache.spark:spark-streaming_2.11:2.0.1"
//simple streaming 
%spark
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.sql.SparkSession
val conf = new SparkConf()
    .setAppName("clickstream")
    .setMaster("local[*]")
    .set("spark.streaming.stopGracefullyOnShutdown", "true")
    .set("spark.driver.allowMultipleContexts","true")

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config(conf)
  .getOrCreate()
val ssc = new StreamingContext(conf, Seconds(1))
val topicsSet = Set("timer")
val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.25.1:9091,192.168.25.1:9092,192.168.25.1:9093")
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
                                   ssc, kafkaParams, topicsSet).map(_._2)
lines.window(Seconds(60)).foreachRDD{ rdd =>
    val clickDF = spark.read.json(rdd) //doesn't have to be json
    clickDF.createOrReplaceTempView("testjson1")
    //olderway
    //clickDF.registerTempTable("testjson2")
    clickDF.show
}
lines.print()
ssc.start()
ssc.awaitTermination()

我能够打印每个KAFKA消息,但是当我运行简单的SQL %sql select * from testjson1 // or testjson2时,我会收到以下错误

java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)
at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:646)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:281)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

在本文中,正在查询流媒体数据(带有Twitter示例)。因此,我认为Kafka流媒体应该有可能。所以我想,也许我做错了什么或错过了某些观点?

欢迎任何想法,建议,建议

错误消息不会说明缺少临时视图。错误消息告诉您,类型无提供名称为" get"的元素。

在调用动作时,使用SPARK进行基于RDD的计算。因此,直到您创建临时表的地步都没有执行计算。当您在表上执行查询时,所有计算将执行。如果您的表不存在,您将收到另一个错误消息。

也许可以打印Kafka消息,但是您的例外告诉您,没有实例不知道"获取"。因此,我相信您的源JSON数据包含没有数据的项目,并且这些项目无需表示,因此在Spark执行计算时导致执行。

我建议您通过测试是否与不包含空的JSON元素的示例数据一起测试是否可以验证您的解决方案。

相关内容

  • 没有找到相关文章