卡夫卡S3连接:基于挂钟的定时旋转似乎不会写



我们使用的是Confluent的Kafka S3连接器,版本5.2.1。 在分布式辅助角色设置中使用一个节点运行。

根据文档,我们应该能够在大小和基于时间的间隔上将刷新设置为 S3。

我们正在使用以下刷新设置

{ 
"rotate.interval.ms": 300000, // 5 minutes
"flush.size": 1000,
"timestamp.extractor": "Wallclock" // default
... (other settings)
}

但我没有看到任何数据写入少于 1000 条消息但有可用数据的主题中。

但是,当我将设置更改为较小的刷新大小并删除旋转间隔时:

{ 
"flush.size": 5, // some small amount
"timestamp.extractor": "Wallclock", // default
... (other settings)
}

在所有其他设置相同的情况下,我可以立即看到 s3 存储桶中的数据。

我没有更改任何其他设置,所以我非常有信心与 s3 的连接,并且我看到正在部署的任务工作者。

我错过了什么吗?

如果要每 5 分钟一次,则应首选rotate.schedule.interval.ms,这会强制 Connect 在此间隔内转储文件。

此配置可确保按配置的时间间隔调用文件提交。提交将在计划的时间执行,无论以前的提交时间或消息数如何

rotate.interval.ms将根据批处理中第一个使用的记录检查记录时间戳。

时间间隔是使用时间戳提取器确定

如果记录少于刷新大小,则完整批处理将只在内存中等待,直到新记录的时间戳差异大于第一个看到的记录。

flush.size始终优先于所有其他时间设置,当我上次查看源代码时写入文件时。

最新更新