我们有一个生产者应用程序,它现在已经运行了几天,并且正在向主题A生成数据。我们希望启动 hdfs 连接器以从主题 A 读取,而不是从偏移量 0 读取(因为这会导致巨大的滞后(。我们想从最新的偏移量开始(主题 A 中一直在有新数据(。
1( 由于连接器从 hdfs 中的主题名称中获取偏移量信息,由于 hdfs 中不存在文件,我们如何从最新的偏移量中读取?
2(我能想到的一种选择是手动为每个分区创建具有最新偏移量的虚拟文件,但我们在这里谈论的是主题A中的60个分区,那么有没有更优雅的方法可以做到这一点?
NoName,最近添加了HDFS连接器在HDFS中没有文件名的情况下重置为最新提交偏移量的功能。
您可以在版本 4.0.1或 4.1.0 及更高版本中找到它。
HDFS 连接器是管理使用者偏移本身的接收器连接器。它旨在这样做,以便在将文件导出到 HDFS 时实现恰好一次的语义。在上述之前的版本中,如果连接器在 HDFS 中找不到任何文件,它将从主题的最早偏移量开始消费,而不管任何使用者设置如何。
您可以在此处找到相关更改,这些更改现在允许连接器在 HDFS 中没有文件的情况下查询提交的偏移量:
https://github.com/confluentinc/kafka-connect-hdfs/pull/299 和 https://github.com/confluentinc/kafka-connect-hdfs/pull/305
您可以设置此属性,以使连接的使用者组从主题中最新的可用偏移量开始
consumer.auto.offset.reset=latest
虽然,Connect 通常可以相当快地赶上一个大型集群和每个分区 1 个任务,所以从最早开始应该不会那么糟糕