我在齐柏林飞艇笔记本中保存火花流消耗的卡夫卡消息时遇到问题。
我的代码是:
case class Message(id: Long, message: String, timestamp: Long) extends Serializable
val ssc = new StreamingContext(sc, Seconds(2))
val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
StorageLevel.MEMORY_ONLY)
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)
val mes = messagesStream.window(Seconds(10))
mes
.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))
ssc.start()
当我运行%sql select * from messages
时,它不显示任何数据,但表已定义。如果我在 Cassandra 上将保存更改为 tempTable,它会正确保存并显示数据。不明白为什么会这样。
感谢您的帮助。
这就是问题所在。让我们首先回顾一下foreachRDD运算符的定义:
foreachRDD
不是按照其预期使用方式使用的。它是将函数 func 应用于从流生成的每个 RDD 的最通用的输出运算符。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数 func 在运行流应用程序的驱动程序进程中执行,并且通常具有 RDD 操作,这些操作将强制计算流 RDD。
因此,您的代码实际发生的情况如下:
由于 DStreams 由输出操作延迟执行,就像 RDD 由 RDD 操作延迟执行一样。具体来说,DStream 输出操作中的 RDD 操作强制处理接收的数据。因此,如果你的应用程序没有任何输出操作,而你没有,或者像dstream.foreachRDD()这样的输出操作没有任何RDD操作,那么什么都不会被执行。系统将简单地接收数据并将其丢弃。
因此,每次执行registerTempTable
时都会丢弃RDD数据,因此SQL查询会给出空结果。
要解决您的问题,您需要将数据保存在某个地方(Cassandra 是一个不错的选择),然后对其进行查询。
如果你想避免另一个集群:另一种解决方案是将RDD转换为行,然后转换为DF,然后将其作为镶木地板或ORC保存到HDFS,并可选择附加文件,例如:
write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")
我只是想知道AWS博主如何能够直接在临时表上执行分析[在此处输入链接描述][1]
好消息是结构化流媒体即将到来:)
[1]:AWS 博客:https://blogs.aws.amazon.com/bigdata/post/Tx3K805CZ8WFBRP/Analyze-Realtime-Data-from-Amazon-Kinesis-Streams-Using-Zeppelin-and-Spark-Stream