将键值对的 RDD 保存到 CSV 文件



我有一个键值对的RDD,我想将其保存为CSV文件。

我编写了这段代码来从HDFS的一系列文件中获取RDD。

val result = sc.sequenceFile[String,String](filenames)
val rdd_j= result.map(x => x._2)
rdd_j.take(1).foreach(println)

这为我提供了键值对的输出。下面是输出。

{"lat":-37.676842,"lon":144.899414,"geoHash8":"r1r19m0s","adminRegionId":2344705 }

像这样的行有很多。

现在我想将所有行保存到一个CSV中,键作为列,它们的值作为单元格值。此外,某些行中可能缺少某些键。请帮忙!

如果所有预期的列都是已知的,则可以将数据转换为数据帧并使用"from_json"函数提取:

val value = "{"lat":-37.676842,"lon":144.899414,"geoHash8":"r1r19m0s","adminRegionId":2344705 }"
val rdd_j = sparkContext.parallelize(Seq(value))
// schema - other expected columns can be added here
val schema = StructType(
Seq(
StructField(name = "lat", dataType = DoubleType, nullable = true),
StructField(name = "lon", dataType = DoubleType, nullable = true)
)
)
// action
val df = rdd_j.toDF("value")
val result = df
.withColumn("fromJson", from_json($"value", schema))
.select($"fromJson.*")
result.show(false)
result.write.csv("outputPath")

输出:

+----------+----------+
|lat       |lon       |
+----------+----------+
|-37.676842|144.899414|
+----------+----------+

PS当模式未知时,可以使用简单的分配:

val result=spark.read.json(rdd_j.toDS())

最新更新