JSON解码Base64数据流Apache Flink



需要一些建议,我已经使用scala创建了一个flink作业来消费来自Kafka的消息。但是消息被压缩成base64编码。我试过这个代码

val x_stream: DataStream[ObjectNode] = env
.addSource(
new FlinkKafkaConsumer010[ObjectNode](parameters.get("kafka.topic.source"),
new JsonNodeDeserializationSchema(),
kfk_props
).setStartFromEarliest()
).name("Topic Test")).rebalance

代码直接失败,因为它不是有效的Json格式。

,然后我尝试使用SimpleStringSchema(),如下面的代码

val x_stream: DataStream[String] = env
.addSource(
new FlinkKafkaConsumer010[String](parameters.get("kafka.topic.source"),
new SimpleStringSchema(),
kfk_props
).setStartFromEarliest()
).name("Topic Test")).rebalance

kafka消息消费完美,但输出就像下面

.....
Br?G"p)0?p?AF??g}?Ly?@?
??>??j?)??);?E?]<d╚? ?-?@?g?????'2???�?�o???r?z????Q$????p    F╔?7?yx+_'v?2???K&??O??c??D,c0F2??ny[?=??%?/?M1:???bq?yHt"A??5???

我如何将此数据解码为有效的JSON?

最诚挚的问候,

你应该实现KafkaDeserializationSchema接口来定制解析逻辑。

首先解码base64并转换为字节流,然后生成字符串对象,最后解析为json

最新更新