我在 Kafka 中收到了 1 条消息,由几个独立的 JSON 行组成。 我想将此消息流式传输到 HDFS。 问题是,我的代码只保存了第一个 JSON,而忽略了其余的。
示例 1 Kafka 消息(不是多条消息):
{"field": "1"}
{"field": "2"}
{"field": "3"}
Scala 代码的一部分:
val stream = KafkaSource.kafkaStream[String, String, StringDecoder, StringDecoder](
streamingContext, brokers, new ZooKeeperOffsetsStore(zkQuorum, zkPath), topic)
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
val df = spark.sqlContext.read.format(rdd.map(m => m._2))
df.write.mode(SaveMode.Append).format("json").save(outputPath)
}
})
特定的解决方案在于我需要映射所有行的rdd.map(m => m._2)
部分,而不仅仅是第一行。 在我看来,rdd
本身已经被切割并且不包含其余的 JSON 行。
我通过使用文本而不是 JSON 来解决它。 主要区别在于toDF()
转换:
stream.foreachRDD(rdd => {
if (!rdd.isEmpty) {
//works as .txt file:
rdd.map(m => m._2).toDF().coalesce(1).write.mode(SaveMode.Append).format("text").save(outputPath)
}
})