在反序列化阶段将一条记录分成多条



我试图通过Flink获得kineesis数据。

在我的例子中,一条记录中有多条消息。我如何将它分成多条记录?(我将把它发送给Elasticsearch.)

我试着去搜索,但找不到合适的答案。

我的代码所做的是从Kinesis获取数据,解压缩gzip,将其转换为字符串,然后使用objectMapper。读取我的POJO的值

有两个POJO:一个用于整个事件,一个用于LogEvents。

{
"messageType":"DATA_MESSAGE","owner":"<account id>",
"logGroup":"<clustername>","logStream":"<log stream name>",
"subscriptionFilters":["<subscription name>"],
"logEvents":[
{"id":"<id>","timestamp":<timestamp>,"message":"msg 1"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 2"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 3"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 4"},
]
}

反序列化数据的processElement()方法可以多次调用output()。每次使用不同的元素。也就是说,你需要在循环中遍历调用output()的批处理元素。

链中的下一个操作符将获取单个元素。

  • 也许你可以使用flatmap函数

  • FlatMapFunction的核心方法。从输入数据集中获取一个元素,并将其转换为零个、一个或多个元素。

    datastream.flatMap(
    new FlatMapFunction<POJO, LogEvent>() {
    @Override
    public void flatMap(POJO input, Collector<LogEvent> out) throws Exception {
    LogEvent logEvent = xxxxx;
    out.collect(logEvent);
    }
    });
    

示例JSON具有数组属性。您需要编写一个自定义的用户定义函数来将JSON转换成多行,并使用Flink中的flatMap函数来打破嵌套。

通过这样做,您将能够以所需的格式从JSON中提取行,并将它们作为行发送给后续的Flink操作符。

最新更新