在apache-flink中获取JSON作为输入



我正在尝试从Flink中的Kafka Topic接收和访问JSON数据。工作原理是,生成数据,将其发送到Kafka主题,并在Flink中以字符串形式接收。但我想以面向对象的方式访问数据(例如,从每条消息中提取特定的atrubute(?

因此,我有一个Kafka Producer,它将数据(例如,每1秒(发送到Kafka主题:

ObjectMapper test = new ObjectMapper();
ObjectNode jNode= test.createObjectNode();
jNode.put("LoPos", longPos)
.put("LaPos", latPos)
.put("Timestamp", timestamp.toString());
ProducerRecord<String, ObjectNode> rec = new ProducerRecord<String, ObjectNode>(topicName, jNode);
producer.send(rec);

所以JSON数据看起来是这样的:

{"LoPos":10.5,"LaPos":2.5,"Timestamp":"2022-10-31 12:45:19.353"}

工作原理是,接收数据并将其打印为字符串:

DataStream<String> input =
env.fromSource(
KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setBounded(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setTopics(topicName)
.build(),
WatermarkStrategy.noWatermarks(),
"kafka-source");

将数据打印为字符串:

DataStream<String> parsed = input.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) {
System.out.println(value);
return "test";

我如何在Flink中接收数据并以面向对象的方式访问它(例如,从每条消息中提取LoPos(?你会推荐哪种方法?我用JSONValueDeserializationSchema尝试过,但没有成功。。。

谢谢!

更新1:我更新到Flink 1.16以使用JsonDeserializationSchema。然后我创建了一个Flink Pojo事件,如下所示:

public class Event {
public double LoPos;
public double LaPos;
public Timestamp timestamp;
public Event() {}
public Event(final double LoPos, final double LaPos, final Timestamp timestamp) {
this.LaPos=LaPos;
this.LoPos=LoPos;
this.timestamp=timestamp;
}
@Override
public String toString() {
return String.valueOf(LaPos);
}

}

为了读取JSON数据,我实现了以下内容:

KafkaSource<Event> source = KafkaSource.<Event>builder()
.setBootstrapServers("localhost:9092")
.setBounded(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(Event.class))
.setTopics("testTopic2")
.build();

DataStream<Event> test=env.fromSource(source, WatermarkStrategy.noWatermarks(), "test");

System.out.println(source.toString());
System.out.println(test.toString());

//test.sinkTo(new PrintSink<>());               
test.print();
env.execute();

所以我希望,当使用source.toString((时,LaPos的值会被返回。但我得到的只是:

org.apache.flink.connector.kafka.source.KafkaSource@510f3d34

我做错了什么?

Immerok Apache Flink Cookbook中的一个食谱涵盖了此主题。

在下面的示例中,我假设Event是Flink POJO。

使用Flink 1.15或更早版本,您应该使用自定义的反序列化程序:

KafkaSource<Event> source =
KafkaSource.<Event>builder()
.setBootstrapServers("localhost:9092")
.setTopics(TOPIC)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new EventDeserializationSchema())
.build();

反序列化程序可以是这样的:

public class EventDeserializationSchema extends AbstractDeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
/**
* For performance reasons it's better to create on ObjectMapper in this open method rather than
* creating a new ObjectMapper for every record.
*/
@Override
public void open(InitializationContext context) {
// JavaTimeModule is needed for Java 8 data time (Instant) support
objectMapper = JsonMapper.builder().build().registerModule(new JavaTimeModule());
}
/**
* If our deserialize method needed access to the information in the Kafka headers of a
* KafkaConsumerRecord, we would have implemented a KafkaRecordDeserializationSchema instead of
* extending AbstractDeserializationSchema.
*/
@Override
public Event deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, Event.class);
}
}

我们在Flink 1.16中简化了这一点,在那里我们添加了一个可以使用的合适的JsonDeserializationSchema

KafkaSource<Event> source =
KafkaSource.<Event>builder()
.setBootstrapServers("localhost:9092")
.setTopics(TOPIC)
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new JsonDeserializationSchema<>(Event.class))
.build();

免责声明:我为Immerok工作。

相关内容

  • 没有找到相关文章

最新更新