节点KAFKA-复制消息即将到来



我在使用消费者组消费kafka消息时会得到重复的消息。

我正在使用此nodejs库。https://www.npmjs.com/package/kafka-node

我的消费者代码在下面给出

const config = require( '../../configs' );
const kafka = require( 'kafka-node' );
var options = {
    id: 'consumer1',
    kafkaHost: config.kafka.prod.kafka_host, //multiple kafka hosts (comma separated)
    groupId: "test-group2",
    sessionTimeout: 15000,
    protocol: ['roundrobin'],
    fromOffset: 'earliest'
};
var consumerGroup = new kafka.ConsumerGroup( options, 'my-replicated-topic3' );
consumerGroup.on( 'message', function ( message ) {
    console.log( message );
} );

我要低于结果。

{ topic: 'my-replicated-topic3',
  value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
  offset: 8941,
  partition: 0,
  highWaterOffset: 8966,
  key: null }
  ---
  ---
  ---
  ---
  { topic: 'my-replicated-topic3',
  value: '{"meta":{"topic":"my-replicated-topic3","added_at":"2019-02-22T09:25:54.708Z","server":"cron"},"data":"1550827554708 ::: Totam quis qui. Sit dolore laboriosam odio. Facilis porro et quam repellat pariatur. Ad voluptatem quidem."}',
  offset: 8941,
  partition: 0,
  highWaterOffset: 8970,
  key: null }

您可以看到每几个记录后都会重复相同的消息。这里 offset 是相同的,但是 HighWaterOffset 是所有重复的消息。

请建议解决此问题的方法。

您是否重新构成消费者?您的消费者如何承担偏移?

默认情况下,您的消费者每5秒自动承担一次偏移。您的图书馆就是这种情况。

如果您在承诺偏移之前重新确定消费者,它将从最后一个承诺的偏移重新启动。

最新更新