我是 Flink 的新手。我有一个要求,我需要从 Kafka 流中连续读取数据,但要批量写入。从而减少MongoServer上的查询数量。
请指导我最好的方法。
我目前尝试的。
- 从卡夫卡源读取数据
- 应用5分钟的时间窗口
- 减少条目以创建条目列表。
- 从 MongoSink 函数中读取列表,执行 BulkWrite 操作
谢谢 阿什尼克
以上似乎应该有效。由于Mongo客户端非常简单,如果你想提高效率,你可以实现自己的有状态ProcessFunction
,它保留一个条目列表,并在列表达到一定大小或经过足够的时间时刷新到MongoDB。