目前,我们正在将一个spark数据帧转换为JSON字符串,以发送到kafka。
在这个过程中,我们做了两次toJSON,它为内部json插入\。
代码段:
val df=spark.sql("select * from dB.tbl")
val bus_dt="2022-09-23"
case class kafkaMsg(busDate:String,msg:String)
假设我的df有两列作为ID STATUS,这将构成我的kafka消息的内部json。
JSON是为msg创建的,并应用于case类。
val rdd=df.toJSON.rdd.map(msg=>kafkaMsg(busDate,msg))
该步骤的输出:
kafkaMsg(2022-09-23,{"id":1,"status":"active"})
现在,为了将busDate和msg作为JSON发送到kafka,再次应用了toJSON。
val df1=spark.createDataFrame(rdd).toJSON
输出为:
{"busDate":"2022-09-23","msg":"{"id":1,"status":"active"}"}
内部JSON具有\,这不是消费者所期望的。
预期JSON:
{"busDate":"2022-09-23","msg":{"id":1,"status":"active"}}
我如何在没有\的情况下创建这个json并发送到kafka。
请注意,消息值各不相同,无法映射到架构。
您的msg
被转义,因为它已经是一个字符串。因此,当您转换为JSON时,您要字符串化一个字符串。。。
JSON可以表示为Map[String, ?]
,所以如果您的输入数据还没有模式,请定义它
以PySpark为例。
scm = StructType([
StructField('busDate', StringType(), nullable=False),
StructField('msg', MapType(StringType(), StringType()), nullable=False)
])
sdf = spark.createDataFrame([
('2022-09-23', {"id":1,"status":"active"}),
], schema=scm)
模式-请注意,msg
不是一个字符串,而是一个Map[String, String]
。不,你不能有多个值类型-Spark SQL和MapType带有字符串键和任何值
root
|-- busDate: string (nullable = false)
|-- msg: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
作为JSON-您不需要Jackson,也不需要破解RDD。。。
kafkaDf = sdf.selectExpr("to_json(struct(*)) as value")
kafkaDf.show(truncate=False)
没有逃脱。。。
请注意,id
类型已转换。如果这不是你想要的,那么你需要使用msg : StructType
而不是MapType
,并给出id : IntegerType
。(显然,这假设数据帧中的所有记录都是一致类型的(
+-----------------------------------------------------------+
|value |
+-----------------------------------------------------------+
|{"busDate":"2022-09-23","msg":{"id":"1","status":"active"}}|
+-----------------------------------------------------------+
您也可以拔出钥匙(切换为使用spark.sql.functions
(
kafkaDf = sdf.select(
f.col("msg.id").cast("int").alias('key'),
f.to_json(f.struct('*')).alias('value')
)
kafkaDf.printSchema()
kafkaDf.show(truncate=False)
root
|-- key: integer (nullable = true)
|-- value: string (nullable = true)
+---+-----------------------------------------------------------+
|key|value |
+---+-----------------------------------------------------------+
|1 |{"busDate":"2022-09-23","msg":{"id":"1","status":"active"}}|
+---+-----------------------------------------------------------+
然后你可以使用kafkaDf.write.format("kafka")
,作为正常的
或者,如果您想将字符串信息包装在单个字段中,而不是键值对中,那么您的Kafka消费者将需要自己处理,例如对记录和内部字符串(JSON值(进行双重反序列化。