我有一个这种格式的流式csv数据集
2,C4653,C5030
2,C5782,C16712
6,C1191,C419
15,C3380,C22841
18,C2436,C5030
我正在尝试获取 Dstream 并将其转换为数据帧,我应该在其中将每个字段作为列获取。 像这样的东西。
col1 col2 col3
2 C4653 C5030
2 C5782 C16712
等等。
我正在使用以下代码,但无法使其工作。这是我正在使用的代码。
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val seperator = lines.map(_.split(","))
lines.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.map(_.split(",")).toDF().show();
}
我得到以下内容作为我正在使用的代码的输出。
+-----------------+
| value|
+-----------------+
|[2, C4653, C5030]|
+-----------------+
但是,我正在尝试将其分成三列。请帮忙。
你可以尝试这样的事情。
val wordsDataFrame = rdd.map { record => {
val recordArr = record.split(",")
(recordArr(0),recordArr(1),recordArr(2))
} }.toDF("col1","col2","col3")
请提供带有 toDF 的架构。像这样的东西val wordsDataFrame = rdd.map(_.split(",")).toDF("col1","col2","col3").show()
应该可以工作