卡夫卡消费者-如何识别偏移跳过/丢失偏移



设置:
我们有一个Debezium/Kafka Connect设置,其中有一个Debezium-Oracle生产者和一个Confluend JDBC消费者/接收器。

起始位置/背景/问题:
由于流量大,我们已将log.retention.minutes减少到1h,这在99%的时间内都是合适的。但在极少数情况下,卡夫卡的消费者会放慢速度,再也跟不上了。在这种情况下,消息将在消费者拾取和处理之前在Kafka中被删除(由于上述保留期(。在默认配置中,使用者将跳过缺失的记录,选择最早可用的偏移量。这导致了目标方面的不一致性。

问题:
如何处理这些情况(如果不能提高log.retension.minutes(
注意:如果消费者在找不到给定偏移量的消息的情况下抛出异常/stop/etc,我们也可以

我们迄今为止所做的努力
我们尝试将使用者的auto.offset.reset设置为none,并希望使用者在找不到偏移时停止。理论上这应该是可行的。在实践中,当消费者被实例化时,它会立即抛出异常,因为没有第一个/初始偏移。

最后的想法 那么我们可以使用另一个配置参数吗?(类似于"如果偏移量丢失/跳过,但不是第一次启动时抛出异常"?(或者,如果消费者跳过消息,我们是否可以监控JMX度量?

为消费者设置auto.offset.resetnone,并期望消费者在找不到偏移时停止

这就是它的作用,是的。

在实践中,当消费者被实例化时,它会立即抛出异常,因为没有第一个/初始偏移

您需要首先实际初始化组,然后将其查找到最早的偏移量。例如kafka-consumer-offsets --reset-offsets --to-earliest --group connect-<name>

类似于"如果偏移量丢失/跳过,但不是在第一次启动时抛出异常"?(

CCD_ 8与";第一个";以及";下一个";启动。但是,您可以使用consumer.override.auto.offset.reset=earliest创建连接器,然后等待它运行,然后使用PUT /config调用将其设置回none。然后每当它再次停止运行时重复。

JMX度量,我们可以在消费者跳过消息的情况下进行监控

我不知道;度量主要是报告处理的字节。您还必须跟踪您期望它读取的字节数。

您需要其他监控解决方案来检测代理上正在删除的日志段,并将这些偏移范围与消费者当前读取的偏移进行比较。

最新更新