>我有 Spring boot 应用程序并与 Apache Flink 集成。我想从 Kafka 系统读取数据,并将它们暴露给 REST 端点。
以下是我的简单数据,
@GetMapping("/details/{personName}")
public String getPersonDetails() throws Exception {
StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "group_id");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
logger.info(value);
return value;
}
}).print();
env.execute();
return "hello world";
}
我的问题是,
- 我的 Kafka 返回字符串值如下,
"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
如何通过应用过滤器转换为 JSON 来返回。例如,如果我从 REST 调用输入的是"John",我想对它们进行分组并对权重值求和并返回为 JSON(仅名称和权重)。
第二个问题, 我无法停止执行环境。还有其他选择吗?我检查了 Flink 文档,但我没有得到任何适合我的情况。
第三个问题, 我想保持环境是急于加载,尝试使用静态块,但它也需要更多时间。
NFRS:
我在 Kafka 中有大量数据,所以想要扩展和快速处理。
听起来您可能需要花更多时间查看 Flink 文档。但简而言之...
- 添加一个
MapFunction
,用于将字符串解析为 JSON,提取名称和权重,并将其输出为 Tuple2<String、Integer> 或某个自定义 Java 类。 - 执行 groupBy(名称字段),后跟一个对权重求和并将其保存在状态的
ProcessFunction
。 - 使用
QueryableState
向作为程序 main() 方法的一部分运行的代码公开状态(总和权重)。 - 在 main 方法中,实现一个 REST 处理程序,该处理程序使用
QueryableStateClient
获取给定名称的权重。