我们正在开源Apache kafka连接器上使用mongodb kafka连接器,用于从Mongo到HDFS的json数据的数据接收。我们有kafka消费者,它读取kafka中的数据更改并将其写入hdfs文件。
我们希望将源连接器安排在不同的特定时间。
我们需要根据预定日期触发卡夫卡消息。
我们可以使用源连接器的配置属性来处理这种情况,从合流到自定义轮询间隔
链接:
https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/#std-标签源配置所有属性
===>poll.wait.time.ms可以是一个解决方案
否则,有Kafka消息调度程序:
https://github.com/etf1/kafka-message-scheduler
使用调度器自动消耗Kafka中的数据
当您创建一个新的调度程序时,vkconfig脚本会采取以下步骤:
使用为计划程序指定的名称创建新的Vertica架构。您可以在配置过程中使用此名称来标识计划程序。
在新创建的模式中创建管理Kafka数据加载所需的表。
来自MongoDB Kafka connect官方文档:
https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/all-properties/#change-流
使用以下配置设置可以为更改流指定聚合管道,并为更改流游标指定读取首选项。
poll.await.time.ms==在检查更改流游标以获取新结果之前等待的时间(以毫秒为单位(。
或使用:poll.max.batch.size==在轮询更改流游标以查找新数据时,单批读取的最大文档数。您可以使用此设置来限制连接器内部缓冲的数据量。