Flink 数据流转换并公开到 REST 端点



>我有 Spring boot 应用程序并与 Apache Flink 集成。我想从 Kafka 系统读取数据,并将它们暴露给 REST 端点。

以下是我的简单数据,

@GetMapping("/details/{personName}")
public String getPersonDetails() throws Exception {
StreamExecutionEnvironment env =  LocalStreamEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "group_id");

FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test-topic-1",
new SimpleStringSchema(), properties);
consumer.setStartFromEarliest();
DataStream<String> stream = env.addSource(consumer);
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(String value) throws Exception {
logger.info(value);
return value;
}
}).print();
env.execute();
return "hello world";
}

我的问题是,

  • 我的 Kafka 返回字符串值如下,

"id":"1","PersonName":"John","address":"Bristol","weight":"34", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"2","PersonName":"Mann","address":"Bristol","weight":"88", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"3","PersonName":"Chris","address":"Leeds","weight":"12", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"4","PersonName":"John","address":"Bristol","weight":"44", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"5","PersonName":"John","address":"NewPort","weight":"26", "country":"UK","timeStamp":"2020-08-08T10:25:42"}
{"id":"6","PersonName":"Mann","address":"Bristol","weight":"89", "country":"UK","timeStamp":"2020-08-08T10:25:42"}

如何通过应用过滤器转换为 JSON 来返回。例如,如果我从 REST 调用输入的是"John",我想对它们进行分组并对权重值求和并返回为 JSON(仅名称和权重)。

  • 第二个问题, 我无法停止执行环境。还有其他选择吗?我检查了 Flink 文档,但我没有得到任何适合我的情况。

  • 第三个问题, 我想保持环境是急于加载,尝试使用静态块,但它也需要更多时间。

NFRS:

我在 Kafka 中有大量数据,所以想要扩展和快速处理。

听起来您可能需要花更多时间查看 Flink 文档。但简而言之...

  1. 添加一个MapFunction,用于将字符串解析为 JSON,提取名称和权重,并将其输出为 Tuple2<String、Integer> 或某个自定义 Java 类。
  2. 执行 groupBy(名称字段),后跟一个对权重求和并将其保存在状态的ProcessFunction
  3. 使用QueryableState向作为程序 main() 方法的一部分运行的代码公开状态(总和权重)。
  4. 在 main 方法中,实现一个 REST 处理程序,该处理程序使用QueryableStateClient获取给定名称的权重。

相关内容

  • 没有找到相关文章

最新更新