如何存储来自 kafka 的 JSONLines RDD 消息



我在 Kafka 中收到了 1 条消息,由几个独立的 JSON 行组成。 我想将此消息流式传输到 HDFS。 问题是,我的代码只保存了第一个 JSON,而忽略了其余的。

示例 1 Kafka 消息(不是多条消息):

{"field": "1"}
{"field": "2"}
{"field": "3"}

Scala 代码的一部分:

 val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
      streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {
        val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
        val df = spark.sqlContext.read.format(rdd.map(m => m._2))
        df.write.mode(SaveMode.Append).format("json").save(outputPath)
      }
    })

特定的解决方案在于我需要映射所有行的rdd.map(m => m._2)部分,而不仅仅是第一行。 在我看来,rdd本身已经被切割并且不包含其余的 JSON 行。

我通过使用文本而不是 JSON 来解决它。 主要区别在于toDF()转换:

stream.foreachRDD(rdd => {
      if (!rdd.isEmpty) {        
        //works as .txt file: 
        rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)

      }
    })

相关内容

  • 没有找到相关文章

最新更新