在齐柏林飞艇笔记本中保存 Spark 流使用的 Kafka 消息



我在齐柏林飞艇笔记本中保存火花流消耗的卡夫卡消息时遇到问题。

我的代码是:

case class Message(id: Long, message: String, timestamp: Long) extends Serializable
   val ssc = new StreamingContext(sc, Seconds(2))
  val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, 
    Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
    Map("test" -> 4),
    StorageLevel.MEMORY_ONLY)
    .map { case (k, v) =>  implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
    .filter(_.id % 2 == 0)
  val mes =  messagesStream.window(Seconds(10))
  mes
  .map(m => Message(m.id, m.message, m.timestamp))
  .foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))
  ssc.start() 

当我运行%sql select * from messages时,它不显示任何数据,但表已定义。如果我在 Cassandra 上将保存更改为 tempTable,它会正确保存并显示数据。不明白为什么会这样。

感谢您的帮助。

好的,

这就是问题所在。让我们首先回顾一下foreachRDD运算符的定义:

foreachRDD不是按照其预期使用方式使用的。它是将函数 func 应用于从流生成的每个 RDD 的最通用的输出运算符。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数 func 在运行流应用程序的驱动程序进程中执行,并且通常具有 RDD 操作,这些操作将强制计算流 RDD。

因此,您的代码实际发生的情况如下:

由于 DStreams 由输出操作延迟执行,就像 RDD 由 RDD 操作延迟执行一样。具体来说,DStream 输出操作中的 RDD 操作强制处理接收的数据。因此,如果你的应用程序没有任何输出操作,而你没有,或者像dstream.foreachRDD()这样的输出操作没有任何RDD操作,那么什么都不会被执行。系统将简单地接收数据并将其丢弃

因此,每次执行registerTempTable时都会丢弃RDD数据,因此SQL查询会给出空结果。

要解决您的问题,您需要将数据保存在某个地方(Cassandra 是一个不错的选择),然后对其进行查询。

如果你想避免另一个集群:另一种解决方案是将RDD转换为行,然后转换为DF,然后将其作为镶木地板或ORC保存到HDFS,并可选择附加文件,例如:

write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

我只是想知道AWS博主如何能够直接在临时表上执行分析[在此处输入链接描述][1]

好消息是结构化流媒体即将到来:)

[1]:AWS 博客:https://blogs.aws.amazon.com/bigdata/post/Tx3K805CZ8WFBRP/Analyze-Realtime-Data-from-Amazon-Kinesis-Streams-Using-Zeppelin-and-Spark-Stream

最新更新