我已经阅读了他们的博客并理解了他们的例子。https://www.confluent.io/blog/apache-kafka-to-amazon-s3-exactly-once/
但我正试图把我的头脑包裹在我所经历的这个场景中。我目前的配置是:
"flush.size": "50",
"rotate.interval.ms": "-1",
"rotate.schedule.interval.ms": "300000",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"partition.duration.ms": "3600000",
"path.format": "YYYY/MM/dd/HH",
"timestamp.extractor": "Wallclock"
根据我所读到的配置。连接器将提交50
记录的文件或300000ms
(5min(之后的文件,以先到者为准。如果连接器将文件上传到s3,但未能提交给Kafka,那么由于我设置了轮换计划间隔,Kafka将如何重新上传覆盖s3文件的相同记录?这不会导致s3中的重复吗?
S3接收器连接器的文档是另一个很好的资源,它描述了连接器如何保证一次交付到S3,更重要的是,哪些功能组合提供(或不提供(这种保证。
具体来说,该文件中的一个部分写道:
为了保证
TimeBasedPartitioner
的语义只有一次,连接器必须配置为使用TimestampExtractor
的确定性实现和确定性旋转策略。确定性时间戳提取器是Kafka记录(timestamp.extractor=Record
(或记录字段(timestamp.extractor=RecordField
(。确定性旋转策略配置为rotate.interval.ms
(设置rotate.schedule.interval.ms
是不确定性的,将使一次保证失效(。
您的S3接收器连接器配置确实使用了确定性分区器(通过"partitioner.class":"io.confluent.connect.storage.partitioner.TimeBasedPartitioner"(,但它使用了不确定性的Wallclock时间戳提取器(通过"timestamp.extractor": "Wallclock"
(。这是不确定的,因为如果连接器确实必须重新启动(例如,由于故障(并重新处理特定记录,它将在稍后的时间重新处理该记录,并且wallclock时间戳提取器将为该记录选择不同的时间。
其次,您的连接器使用rotate.schedule.interval.ms
选项,文档指出该选项与一次性交付不兼容。例如,如果连接器必须重新处理一系列Kafka记录,它可能会将这些记录分解为与第一次不同的S3对象,这意味着S3连接器最终会写入不同的S3目标。
总之,使用您的配置的S3接收器连接器不会提供一次交付保证。
使用rotate.interval.ms和时间戳。extractor设置为Record此外,确保您正在阅读的主题的时间戳类型设置为"LOG_APPEND_TIME">
我不确定辅助配置是否应该将使用者隔离级别属性设置为已提交读取。取决于S3连接器是否自动执行此操作。
即便如此,当时间戳在broker集群中的领导人选举期间没有单调增加时,事情也可能出错。注意此问题的状态