使用 Flink 从 Kafka 解析 json 格式的字符串



我想做的是读取json格式的字符串,例如

{"a":1, "b":2}

使用 Flink,然后通过其键提取特定值,例如 1。

参考这里: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html

我所做的是:

val params = ParameterTool.fromArgs(args)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONKeyValueDeserializationSchema(false),
params.getProperties
)
val messageStream = env.addSource(kafkaConsumer)

但我不太确定如何前进。在上面的链接中,它说我可以使用objectNode.get(“field”).as(Int/String/…)()按键提取特定值,但我想知道我该怎么做?

或者可以有一种完全不同的方式来实现我想要的?

谢谢!

对来自 Kafka 的数据应用数据转换,如下所示:

messageStream.map(new MapFunction<ObjectNode, Object>() {
@Override
public Object map(ObjectNode value) throws Exception {
value.get("field").as(...)
}
})

相关内容

  • 没有找到相关文章

最新更新