使用顶点将 Mongo 集合流式传输到 Kafka 主题应该采用什么方法?



我有一个MongoDB集合。我想使用 Vert.x 将此集合的子集(基于某些查询(流式传输到 Kafka 主题。

到目前为止,我已经为KafkaWriteStream创建了一个 Vert.x 顶点,它似乎适用于虚拟硬编码字符串。

不幸的是,我不确定如何从MongoDB获取文档流,这些文档流以后可以使用专用的顶点流式传输到Kafka

我该如何解决这个问题?有人有任何相关链接或信息吗?

Vert.x Mongo 客户端模块允许从 Mongo 查询中获取ReadStream

当你拥有它时,连同你的KafkaWriteStream,你可以开始从Mongo读取数据并将其发送到Kafka。

但要注意背压:如果 Mongo 加载数据的速度太快,不要让 Kafka 客户端不堪重负。您的算法应如下所示:

readstream.handler(data -> {
transformedData = transform(data); // your own transformations
writestream.write(transformedData);
if (writestream.writeQueueFull()) {
readstream.pause();
writestream.drainHandler(done -> {
readstream.resume();
});
}
}); 

最新更新