从MongoDB转换一组特定的记录



我有一个周期性触发的批处理作业,它将数据写入MongoDB。这项工作需要大约10分钟,之后我想接收这些数据,并使用Apache Flink进行一些转换(映射、筛选、清理…(。记录之间存在一些依赖关系,这意味着我必须一起处理它们。例如,我喜欢转换客户id为45666的最新批处理作业中的所有记录。结果将是一个汇总记录。

是否有任何最佳实践或方法可以在不自己实现所有内容的情况下做到这一点(从最新工作中提取客户ID,为每个客户选择记录和转换,标记转换后的客户,等等(?

我无法流式传输,因为我必须将多个记录转换在一起,而不是逐个转换。

目前我正在使用Spring Batch、MongoDB、Kafka,并考虑使用Apache Flink。

可以设想,您可以将MongoDB更改流连接到Flink,并将其用作所描述任务的基础。涉及10-35GB数据的事实并不排除使用Flink流,因为如果Flink的状态不适合堆,您可以将其配置为溢出到磁盘。

然而,在得出这是一种明智的做法之前,我想更好地了解情况。

相关内容

  • 没有找到相关文章

最新更新