我有一个MongoDB集合。我想使用 Vert.x 将此集合的子集(基于某些查询(流式传输到 Kafka 主题。
到目前为止,我已经为KafkaWriteStream
创建了一个 Vert.x 顶点,它似乎适用于虚拟硬编码字符串。
不幸的是,我不确定如何从MongoDB获取文档流,这些文档流以后可以使用专用的顶点流式传输到Kafka。
我该如何解决这个问题?有人有任何相关链接或信息吗?
Vert.x Mongo 客户端模块允许从 Mongo 查询中获取ReadStream
。
当你拥有它时,连同你的KafkaWriteStream
,你可以开始从Mongo读取数据并将其发送到Kafka。
但要注意背压:如果 Mongo 加载数据的速度太快,不要让 Kafka 客户端不堪重负。您的算法应如下所示:
readstream.handler(data -> {
transformedData = transform(data); // your own transformations
writestream.write(transformedData);
if (writestream.writeQueueFull()) {
readstream.pause();
writestream.drainHandler(done -> {
readstream.resume();
});
}
});