我有一组从不同消费者组中的kafka读取的应用程序,现在我想将它们加入到同一消费者组中。为了做到这一点并避免消耗大量旧数据,我想定义消费者组的偏移量。
我找到了一些方法来做到这一点,例如在这里和这里
如何在卡夫卡创建一个新的消费者群体
我设法创建了我的组,并定义了例如最近的偏移量或所有主题的偏移量。类似这样的东西:
alias kcg=/opt/kafka/bin/kafka-consumer-groups.sh
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-latest --all-topics --execute
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --shift-by -10 --all-topics --execute
现在,当我试图用一个简单的日期来定义它时,它会抱怨,正如预期的那样,说我的日期不是ISO8601:
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330093926Z" --all-topics --execute
Error: Executing consumer group command failed due to Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
java.text.ParseException: Error parsing timestamp. It does not contain a 'T' according to ISO8601 format
at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1347)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)
但当我尝试使用ISO8601日期时,我仍然会收到一个错误,即它无法解析日期。
kcg --bootstrap-server localhost:9093 --command-config /opt/kafka/config/consumer.properties --group test-group --reset-offsets --to-datetime "20220330T093926Z" --all-topics --execute
Error: Executing consumer group command failed due to Unparseable date: "2022-03-30T09:39:26Z"
java.text.ParseException: Unparseable date: "2022-03-30T09:39:26Z"
at java.base/java.text.DateFormat.parse(DateFormat.java:395)
at org.apache.kafka.common.utils.Utils.getDateTime(Utils.java:1364)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.prepareOffsetsToReset(ConsumerGroupCommand.scala:811)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService.$anonfun$resetOffsets$1(ConsumerGroupCommand.scala:438)
我举了个例子,他们都得到了相同的错误:
2022-03-30T00:01:25+0000
2022-03-30T09:44:25+0000
2022-03-30T09:39:26Z
20220330T093926Z
我确实试着用不同的方式来定义日期,但我不是很幸运:java.text.ParseException:无法解析的日期
有人有这样做的经验吗?我可能做错了什么?
感谢评论中的反馈,我犯了一个非常基本的错误,我以为我在没有Z的情况下进行了测试,但不知何故,我没有。类似于";2022-03-30T001:01.001";工作
添加小数秒数:2022-03-30T09:44:25.000