kafka通过时间戳定义使用者偏移量(不接受iso8601日期?)



我有一组从不同消费者组中的kafka读取的应用程序,现在我想将它们加入到同一消费者组中。为了做到这一点并避免消耗大量旧数据,我想定义消费者组的偏移量。

我找到了一些方法来做到这一点,例如在这里和这里

如何在卡夫卡创建一个新的消费者群体

我设法创建了我的组,并定义了例如最近的偏移量或所有主题的偏移量。类似这样的东西:

alias kcg=/opt/kafka/bin/kafka-consumer-groups.sh
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-latest --all-topics --execute
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --shift-by -10 --all-topics --execute

现在,当我试图用一个简单的日期来定义它时,它会抱怨,正如预期的那样,说我的日期不是ISO8601:

kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330093926Z" --all-topics --execute
Error: Executing consumer group command failed due to Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
java.text.ParseException: Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1347)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)

但当我尝试使用ISO8601日期时,我仍然会收到一个错误,即它无法解析日期。

kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330T093926Z" --all-topics --execute
Error: Executing consumer group command failed due to Unparseable date: "2022-03-30T09:39:26Z"
java.text.ParseException: Unparseable date: "2022-03-30T09:39:26Z"
at java.base/java.text.DateFormat.parse(DateFormat.java:395)
at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1364)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)

我举了个例子,他们都得到了相同的错误:

2022-03-30T00:01:25+0000
2022-03-30T09:44:25+0000    
2022-03-30T09:39:26Z    
20220330T093926Z

我确实试着用不同的方式来定义日期,但我不是很幸运:java.text.ParseException:无法解析的日期

有人有这样做的经验吗?我可能做错了什么?

感谢评论中的反馈,我犯了一个非常基本的错误,我以为我在没有Z的情况下进行了测试,但不知何故,我没有。类似于";2022-03-30T001:01.001";工作

添加小数秒数:2022-03-30T09:44:25.000

相关内容

  • 没有找到相关文章

最新更新