Flink反序列化Kafka JSON



我正试图用flink从kafka主题中读取一条json消息。

我正在使用Kafka 2.4.1和Flink 1.10

我为我的消费者设置了:

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;

FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT, 
new JSONKeyValueDeserializationSchema(false), properties);

当我使用SimpleStringSchema时,我会得到json作为文本,这很好,但使用JSONKeyValueDeserializer我会得到:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

sensor_5将是主题中的一个键。我猜我需要添加其他东西来从kafka消息值中获得JSON,并以某种方式处理该键,但我不确定?

有什么建议吗?

json结构为:

{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}

并通过提交

# Python 3
import json
from confluent_kafka import Producer
dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})
producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))

因此,基本上,如果您查看JSONKeyValueDeserializationSchema的源代码,您可以看到它如下所示:

if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (record.key() != null) {
node.set("key", mapper.readValue(record.key(), JsonNode.class));
}
if (record.value() != null) {
node.set("value", mapper.readValue(record.value(), JsonNode.class));
}
if (includeMetadata) {
node.putObject("metadata")
.put("offset", record.offset())
.put("topic", record.topic())
.put("partition", record.partition());
}
return node;

因此,通常模式希望Your键是JSON而不是String,因此sensor_5会失败。我认为最好和最简单的解决方案是创建您自己的以String为键的实现。

如果您不想在记录中包含密钥,则可以实现DeserializationSchema而不是KeyedDeserializationSchema

一个例子如下:

public class JSONValueDeserializationSchema implements DeserializationSchema<ObjectNode> {
private static final long serialVersionUID = -1L;
private ObjectMapper mapper;
@Override
public ObjectNode deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (message != null) {
node.set("value", mapper.readValue(message, JsonNode.class));
}
return node;
}
@Override
public boolean isEndOfStream(ObjectNode nextElement) {
return false;
}
@Override
public TypeInformation<ObjectNode> getProducedType() {
return getForClass(ObjectNode.class);
}
}

如果你想把密钥也包括在你的记录中,你可以实现Dominik Wosiński在回答中提到的KeyedDeserializationSchema

相关内容

  • 没有找到相关文章

最新更新