Spark 流式传输 csv 到数据帧



我有一个这种格式的流式csv数据集

2,C4653,C5030
2,C5782,C16712
6,C1191,C419
15,C3380,C22841
18,C2436,C5030

我正在尝试获取 Dstream 并将其转换为数据帧,我应该在其中将每个字段作为列获取。 像这样的东西。

col1   col2   col3
 2     C4653  C5030
 2     C5782  C16712   

等等。

我正在使用以下代码,但无法使其工作。这是我正在使用的代码。

  val messages = KafkaUtils.createDirectStream[String, String,    StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val seperator = lines.map(_.split(","))
lines.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.map(_.split(",")).toDF().show();
}

得到以下内容作为我正在使用的代码的输出。

+-----------------+
|            value|
+-----------------+
|[2, C4653, C5030]|
+-----------------+

但是,我正在尝试将其分成三列。请帮忙。

你可以尝试这样的事情。

val wordsDataFrame = rdd.map { record => {
              val recordArr = record.split(",")
              (recordArr(0),recordArr(1),recordArr(2))
            } }.toDF("col1","col2","col3")

请提供带有 toDF 的架构。像这样的东西val wordsDataFrame = rdd.map(_.split(",")).toDF("col1","col2","col3").show()应该可以工作

相关内容

  • 没有找到相关文章

最新更新