我有一个键值对的RDD,我想将其保存为CSV文件。
我编写了这段代码来从HDFS的一系列文件中获取RDD。
val result = sc.sequenceFile[String,String](filenames)
val rdd_j= result.map(x => x._2)
rdd_j.take(1).foreach(println)
这为我提供了键值对的输出。下面是输出。
{"lat":-37.676842,"lon":144.899414,"geoHash8":"r1r19m0s","adminRegionId":2344705 }
像这样的行有很多。
现在我想将所有行保存到一个CSV中,键作为列,它们的值作为单元格值。此外,某些行中可能缺少某些键。请帮忙!
如果所有预期的列都是已知的,则可以将数据转换为数据帧并使用"from_json"函数提取:
val value = "{"lat":-37.676842,"lon":144.899414,"geoHash8":"r1r19m0s","adminRegionId":2344705 }"
val rdd_j = sparkContext.parallelize(Seq(value))
// schema - other expected columns can be added here
val schema = StructType(
Seq(
StructField(name = "lat", dataType = DoubleType, nullable = true),
StructField(name = "lon", dataType = DoubleType, nullable = true)
)
)
// action
val df = rdd_j.toDF("value")
val result = df
.withColumn("fromJson", from_json($"value", schema))
.select($"fromJson.*")
result.show(false)
result.write.csv("outputPath")
输出:
+----------+----------+
|lat |lon |
+----------+----------+
|-37.676842|144.899414|
+----------+----------+
PS当模式未知时,可以使用简单的分配:
val result=spark.read.json(rdd_j.toDS())