使用Springboot API获取Apache Flink结果



我已经创建了一个springboot代码来使用API,这样我就可以在用户到达端点时获取flink结果。

static Map<String, Long> finalList = new HashMap<String, Long>();

上面的是主方法之外的类变量。在main方法中,flink的数据流API使用KafkaSource类从Kafka获取数据,并在flink中执行聚合。数据流被放入data_15min变量中,该变量是datastream<String,>>的对象。

我使用了一个map函数从数据流中获取数据,并将它们放入上面的finalList对象中。

data_15min.map(new MapFunction<Map<String, Long>, Object>() {
ObjectMapper objMapperNew = new ObjectMapper();
@Override
public Object map(Map<String, Long> value) throws Exception {
System.out.println("Value in map: " + value);
System.out.println("KeySet: " + value.keySet());
value.forEach((key, val) -> {
if(!dcListNew.contains(key)) {
dcListNew.add(key);
finalList.put(key, val);
} else {
finalList.replace(key, val);
}
});
System.out.println("DC List NEw inside: " + dcListNew);
System.out.println("Final List inside: " + objMapperNew.writeValueAsString(finalList));
return finalList;
}
});
System.out.println("Final List: " + finalList);

在上面的map函数中打印finalList时,我得到的数据为

Final List inside: {"ABC": 2, "AXY": 10}

在映射的外部,当我打印时,我仍然看到它是:Final List: {}

有人可以帮助我,让这个数据是可用的以外的地图方法,我把它发送到springboot api响应?

我认为这不起作用的主要原因是由于并发性,我假设你在你的环境中某处调用execute(),所以只有在execute()完成之后,你的finalList才被记录填充。

请注意,这样工作是不安全的,很可能会在短时间内导致错误。总的来说,似乎解决方案的体系结构缺少像数据库这样的东西,您将存储Flink的结果,然后从Spring访问db。

最新更新