我试图通过Flink获得kineesis数据。
在我的例子中,一条记录中有多条消息。我如何将它分成多条记录?(我将把它发送给Elasticsearch.)
我试着去搜索,但找不到合适的答案。
我的代码所做的是从Kinesis获取数据,解压缩gzip,将其转换为字符串,然后使用objectMapper。读取我的POJO的值
有两个POJO:一个用于整个事件,一个用于LogEvents。
{
"messageType":"DATA_MESSAGE","owner":"<account id>",
"logGroup":"<clustername>","logStream":"<log stream name>",
"subscriptionFilters":["<subscription name>"],
"logEvents":[
{"id":"<id>","timestamp":<timestamp>,"message":"msg 1"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 2"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 3"},
{"id":"<id>","timestamp":<timestamp>,"message":"msg 4"},
]
}
反序列化数据的processElement()
方法可以多次调用output()
。每次使用不同的元素。也就是说,你需要在循环中遍历调用output()
的批处理元素。
链中的下一个操作符将获取单个元素。
-
也许你可以使用flatmap函数
-
FlatMapFunction的核心方法。从输入数据集中获取一个元素,并将其转换为零个、一个或多个元素。
datastream.flatMap( new FlatMapFunction<POJO, LogEvent>() { @Override public void flatMap(POJO input, Collector<LogEvent> out) throws Exception { LogEvent logEvent = xxxxx; out.collect(logEvent); } });
示例JSON具有数组属性。您需要编写一个自定义的用户定义函数来将JSON转换成多行,并使用Flink中的flatMap函数来打破嵌套。
通过这样做,您将能够以所需的格式从JSON中提取行,并将它们作为行发送给后续的Flink操作符。