我正试图用flink从kafka主题中读取一条json消息。
我正在使用Kafka 2.4.1和Flink 1.10
我为我的消费者设置了:
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
FlinkKafkaConsumer<ObjectNode> sensorConsumer = new FlinkKafkaConsumer(KAFKA_TOPIC_INPUT,
new JSONKeyValueDeserializationSchema(false), properties);
当我使用SimpleStringSchema
时,我会得到json作为文本,这很好,但使用JSONKeyValueDeserializer我会得到:
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sensor_5': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
sensor_5
将是主题中的一个键。我猜我需要添加其他东西来从kafka消息值中获得JSON,并以某种方式处理该键,但我不确定?
有什么建议吗?
json结构为:
{"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
并通过提交
# Python 3
import json
from confluent_kafka import Producer
dict_obj = {"value": 1.0, "timestamp": "2020-05-01 14:00:00.000000"}
producer = Producer({'bootstrap.servers': "kafka:9092"})
producer.produce(topic='sensors-raw', key='sensor_5', value=json.dumps(dict_obj))
因此,基本上,如果您查看JSONKeyValueDeserializationSchema
的源代码,您可以看到它如下所示:
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (record.key() != null) {
node.set("key", mapper.readValue(record.key(), JsonNode.class));
}
if (record.value() != null) {
node.set("value", mapper.readValue(record.value(), JsonNode.class));
}
if (includeMetadata) {
node.putObject("metadata")
.put("offset", record.offset())
.put("topic", record.topic())
.put("partition", record.partition());
}
return node;
因此,通常模式希望Your键是JSON而不是String,因此sensor_5
会失败。我认为最好和最简单的解决方案是创建您自己的以String为键的实现。
如果您不想在记录中包含密钥,则可以实现DeserializationSchema
而不是KeyedDeserializationSchema
。
一个例子如下:
public class JSONValueDeserializationSchema implements DeserializationSchema<ObjectNode> {
private static final long serialVersionUID = -1L;
private ObjectMapper mapper;
@Override
public ObjectNode deserialize(byte[] message) throws IOException {
if (mapper == null) {
mapper = new ObjectMapper();
}
ObjectNode node = mapper.createObjectNode();
if (message != null) {
node.set("value", mapper.readValue(message, JsonNode.class));
}
return node;
}
@Override
public boolean isEndOfStream(ObjectNode nextElement) {
return false;
}
@Override
public TypeInformation<ObjectNode> getProducedType() {
return getForClass(ObjectNode.class);
}
}
如果你想把密钥也包括在你的记录中,你可以实现Dominik Wosiński在回答中提到的KeyedDeserializationSchema
。