从scala中的spark数据帧创建Json



目前,我们正在将一个spark数据帧转换为JSON字符串,以发送到kafka。

在这个过程中,我们做了两次toJSON,它为内部json插入\。

代码段:

val df=spark.sql("select * from dB.tbl")
val bus_dt="2022-09-23" 
case class kafkaMsg(busDate:String,msg:String)

假设我的df有两列作为ID STATUS,这将构成我的kafka消息的内部json。

JSON是为msg创建的,并应用于case类。

val rdd=df.toJSON.rdd.map(msg=>kafkaMsg(busDate,msg))

该步骤的输出:

kafkaMsg(2022-09-23,{"id":1,"status":"active"})

现在,为了将busDate和msg作为JSON发送到kafka,再次应用了toJSON。

val df1=spark.createDataFrame(rdd).toJSON

输出为:

{"busDate":"2022-09-23","msg":"{"id":1,"status":"active"}"}

内部JSON具有\,这不是消费者所期望的。

预期JSON:

{"busDate":"2022-09-23","msg":{"id":1,"status":"active"}}

我如何在没有\的情况下创建这个json并发送到kafka。

请注意,消息值各不相同,无法映射到架构。

您的msg被转义,因为它已经是一个字符串。因此,当您转换为JSON时,您要字符串化一个字符串。。。

JSON可以表示为Map[String, ?],所以如果您的输入数据还没有模式,请定义它

以PySpark为例。

scm = StructType([
StructField('busDate', StringType(), nullable=False),
StructField('msg', MapType(StringType(), StringType()), nullable=False)
])
sdf = spark.createDataFrame([
('2022-09-23', {"id":1,"status":"active"}),
], schema=scm)

模式-请注意,msg不是一个字符串,而是一个Map[String, String]。不,你不能有多个值类型-Spark SQL和MapType带有字符串键和任何值

root
|-- busDate: string (nullable = false)
|-- msg: map (nullable = false)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)

作为JSON-您不需要Jackson,也不需要破解RDD。。。

kafkaDf = sdf.selectExpr("to_json(struct(*)) as value")
kafkaDf.show(truncate=False)

没有逃脱。。。

请注意,id类型已转换。如果这不是你想要的,那么你需要使用msg : StructType而不是MapType,并给出id : IntegerType。(显然,这假设数据帧中的所有记录都是一致类型的(

+-----------------------------------------------------------+
|value                                                      |
+-----------------------------------------------------------+
|{"busDate":"2022-09-23","msg":{"id":"1","status":"active"}}|
+-----------------------------------------------------------+

您也可以拔出钥匙(切换为使用spark.sql.functions(

kafkaDf = sdf.select(
f.col("msg.id").cast("int").alias('key'), 
f.to_json(f.struct('*')).alias('value')
)
kafkaDf.printSchema()
kafkaDf.show(truncate=False)
root
|-- key: integer (nullable = true)
|-- value: string (nullable = true)
+---+-----------------------------------------------------------+
|key|value                                                      |
+---+-----------------------------------------------------------+
|1  |{"busDate":"2022-09-23","msg":{"id":"1","status":"active"}}|
+---+-----------------------------------------------------------+

然后你可以使用kafkaDf.write.format("kafka"),作为正常的


或者,如果您想将字符串信息包装在单个字段中,而不是键值对中,那么您的Kafka消费者将需要自己处理,例如对记录和内部字符串(JSON值(进行双重反序列化。

相关内容

  • 没有找到相关文章

最新更新