Kafka -> Flink DataStream -> MongoDB



我想设置 Flink,以便它将数据流从 Apache Kafka 转换并重定向到 MongoDB。出于测试目的,我正在构建flink-streaming-connectors.kafka示例(https://github.com/apache/flink(。

Flink 正确对 Kafka 流进行了红色,我可以映射它们等,但是当我想将每个接收和转换的消息保存到 MongoDB 时,就会出现问题。我发现的关于MongoDB集成的唯一例子是来自github的flink-mongodb-test。不幸的是,它使用静态数据源(数据库(,而不是数据流。

我相信MongoDB应该有一些DataStream.addSink实现,但显然没有。

实现它的最佳方法是什么?我是否需要编写自定义接收器函数,或者我缺少某些内容?也许应该以不同的方式完成?

我不依赖于任何解决方案,因此任何建议将不胜感激。

下面是一个示例,我究竟要获得什么作为输入,以及我需要存储什么作为输出。

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection

正如你在这个例子中看到的,我主要使用 Flink 进行 Kafka 的消息流缓冲和一些基本的解析。

作为Robert Metzger答案的替代方案,您可以再次将结果写入Kafka,然后使用维护的kafka连接器之一将主题的内容放入MongoDB数据库中。

夫卡 -> 弗林克 -> 卡夫卡 -> 蒙戈/任何东西

使用这种方法,您可以保持"至少一次语义"的行为。

目前 Flink 中没有可用的 Streaming MongoDB sink。

但是,有两种方法可以将数据写入MongoDB:

  • 使用 Flink 的DataStream.write()调用。它允许您将任何输出格式(来自批处理 API(与流式处理一起使用。使用 Flink 的 HadoopOutputFormatWrapper,您可以使用官方的 MongoDB Hadoop 连接器

  • 自行实现接收器。使用Streaming API实现sinks非常容易,我相信MongoDB有一个很好的Java客户端库。

这两种方法都不提供任何复杂的处理保证。但是,当你将 Flink 与 Kafka 一起使用(并且启用了检查点(时,你将至少有一次语义:在错误情况下,数据将再次流式传输到 MongoDB 接收器。如果要执行幂等更新,则重做这些更新不应导致任何不一致。

如果你真的需要 MongoDB 的精确一次语义,你可能应该在 Flink 中提交一个 JIRA 并与社区讨论如何实现它。

相关内容

  • 没有找到相关文章

最新更新