我有一个从csv读取的数据帧。
CSV:
name,age,pets
Alice,23,dog
Bob,30,dog
Charlie,35,
Reading this into a DataFrame called myData:
+-------+---+----+
| name|age|pets|
+-------+---+----+
| Alice| 23| dog|
| Bob| 30| dog|
|Charlie| 35|null|
+-------+---+----+
现在,我想使用myData.toJSON
将此数据帧的每一行转换为 json。我得到的是以下json。
{"name":"Alice","age":"23","pets":"dog"}
{"name":"Bob","age":"30","pets":"dog"}
{"name":"Charlie","age":"35"}
我希望第 3 行的 json 包含空值。 例如。
{"name":"Charlie","age":"35", "pets":null}
但是,这似乎是不可能的。我通过代码进行了调试,看到 Spark 的org.apache.spark.sql.catalyst.json.JacksonGenerator
类具有以下实现
private def writeFields(
row: InternalRow, schema: StructType, fieldWriters:
Seq[ValueWriter]): Unit = {
var i = 0
while (i < row.numFields) {
val field = schema(i)
if (!row.isNullAt(i)) {
gen.writeFieldName(field.name)
fieldWriters(i).apply(row, i)
}
i += 1
}
}
如果一列为空,这似乎是跳过一列。我不太确定为什么这是默认行为,但是有没有办法使用 Spark 的toJSON
在 json 中打印空值?
我正在使用Spark 2.1.0
要使用 Spark 的toJSON
方法在 JSON 中打印空值,可以使用以下代码:
myData.na.fill("null").toJSON
它将为您提供预期的结果:
+-------------------------------------------+
|value |
+-------------------------------------------+
|{"name":"Alice","age":"23","pets":"dog"} |
|{"name":"Bob","age":"30","pets":"dog"} |
|{"name":"Charlie","age":"35","pets":"null"}|
+-------------------------------------------+
我希望它有所帮助!
我已经修改了JacksonGenerator.writeFields函数并包含在我的项目中。 以下是步骤-
1( 在 'src/main/scala/' 中创建包 'org.apache.spark.sql.catalyst.json'
2( 复制杰克逊发电机类
3( 在 '' 包中创建 JacksonGenerator.scala 类并粘贴复制的代码
4( 修改写字段函数
private def writeFields(row: InternalRow, schema: StructType, fieldWriters:Seq[ValueWriter]): Unit = {
var i = 0
while (i < row.numFields) {
val field = schema(i)
if (!row.isNullAt(i)) {
gen.writeFieldName(field.name)
fieldWriters(i).apply(row, i)
}
else{
gen.writeNullField(field.name)
}
i += 1
}}
用Spark 3.0.0测试:
创建 Spark 会话时,请将spark.sql.jsonGenerator.ignoreNullFields
设置为 false。
toJSON
函数内部使用org.apache.spark.sql.catalyst.json.JacksonGenerator
,而又需要org.apache.spark.sql.catalyst.json.JSONOptions
进行配置。 后者包括一个选项ignoreNullFields
. 但是,toJSON
使用默认值,在此特定选项的情况下,默认值取自上面给出的 sql 配置。
配置设置为 false 的示例:
val schema = StructType(Seq(StructField("a", StringType), StructField("b", StringType)))
val rows = Seq(Row("a", null), Row(null, "b"))
val frame = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
println(frame.toJSON.collect().mkString("n"))
生产
{"a":"a","b":null}
{"a":null,"b":"b"}
import org.apache.spark.sql.types._
import scala.util.parsing.json.JSONObject
def convertRowToJSON(row: Row): String = {
val m = row.getValuesMap(row.schema.fieldNames).filter(_._2 != null)
JSONObject(m).toString()
}