为Kafka调整S3文件大小



我正在尝试深入了解S3连接器的flush.sizerotate.interval.ms配置。我部署了S3连接器,我的文件大小似乎从6 kb一直到30 mb,不知道这里是否有人可以帮助我提供如何获得几乎相等的文件大小的建议。

以下是我的设置:flush.size= 200000rotate.interval.ms=10min

我们也尝试了基于这个git中的一个例子来推出我们自己的连接器https://github.com/canelmas/kafka-connect-field-and-time-partitioner但我们仍然无法使文件大小大致相同。

S3 Sink连接器将数据写入每个Kafka分区的分区路径和partitione.class.定义的分区路径

基本上,S3连接器将缓冲区冲洗到以下状态。

  1. rotate.schedule.interval.ms:如果此时间已过
  2. rotate.interval.ms:按时间戳计算,时间已经过去。提取程序时间

注意:这有助于清除积压数据,让我们假设rotate.interval.ms然后我们有6个小时的延迟数据,所以每个时间戳都超过了10如果数据不是,分钟刷新将在几秒钟内延迟它将等待接收下一轮。间隔毫秒通过

  1. flush.size:假设数据流非常高,如果消息在点1&2,则flush将触发。同时,如果数据大小流低,则刷新将基于点1&2

如果是基于时间的分区

  1. partition.duration.ms:定义在单个编码分区目录中刷新到s3的最长时间

最新更新