KAFKA-NODE:管理多个消费者组



我有一个关于管理多个CG的问题,创建了三个消费者组,每个CG都有自己的kafka服务、组Id和主题。

现在我收到了预期的消息,但是,我想知道是否有可能创建下一个场景:

创建三个用户组,但只接收来自其中一个用户组的消息,如果他的kafka服务将失败,则暂时暂停/保持其他用户组,从下一个用户群接收消息,第三个用户群也是如此。

下面是我的代码示例:

function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){  //3
const options = {
groupId: config.kafka_service[i]['groupId'],
host: config.kafka_service[i]['zookeeperHost'],
kafkaHost: config.kafka_service[i]['kafkaHost'],
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
}
//assign all services CG names and create [i] consumer groups!
let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];
customConsumerGroupName = new ConsumerGroup(options, topics);
customConsumerGroupName.on('connect', (resp) => {
console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
});
if(i > 0){
//pause consumers exept FIRST
customConsumerGroupName.pause();
}

customConsumerGroupName.on('message', (message) => {
console.log(message);
});
customConsumerGroupName.on('error', (error) => {
console.log('consumer group error: ', error);
//HERE I NEED TO CALL SECOND CONSUMER TO STEP UP
//MAYBE consumerGroup.resume(); ???
});
}
}

希望它听起来可以理解,谢谢:(

因此,由于Node包的"ConsumerGroup"的名称,似乎出现了混淆。在Kafka术语中,消费者组仅由每个消费者使用的groupId控制。具有相同groupId的消费者不会收到重复的消息,每个主题消息只由一个消费者读取。如果一个消费者宕机,kafka会检测到这一点,并将其分区交给一个单独的消费者。

节点"ConsumerGroup"实际上只是另一个Kafka消费者(在Kafka>0.9时,由Kafka而不是zookeeper管理组的新消费者(。

因此,使用Node ConsumerGroup来利用kafka消费者组的方法如下:

function createConsumerGroup(topics){
const ConsumerGroup = kafka.ConsumerGroup;
//CREATE CONSUMER GROUPS FOR EVERY SERVICE
for(let i = 0; i < config.kafka_service.length ;i++){  //3
const options = {
groupId: 'SOME_GROUP_NAME',
host: config.kafka_service[i]['zookeeperHost'],
kafkaHost: config.kafka_service[i]['kafkaHost'],
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest'
}
//assign all services CG names and create [i] consumer groups!
let customConsumerGroupName = config.kafka_service[i]['consumerGroupName'];
customConsumerGroupName = new ConsumerGroup(options, topics);
customConsumerGroupName.on('connect', (resp) => {
console.log(`${config.kafka_service[i]['consumerGroupName']} is connected!`);
});
customConsumerGroupName.on('message', (message) => {
console.log(message);
});
customConsumerGroupName.on('error', (error) => {
console.log('consumer group error: ', error);
//Error handling logic here, restart the consumer that failed perhaps? 
//Depends on how you want to managed failed consumers.
});
}
}

Nodes ConsumerGroup的每个实例都将是组"SOME_group_NAME"的成员,并且使用同一groupId创建的任何其他使用者也将充当同一kafka使用者组的成员,而不考虑服务器等。

消费者团体解决两个中心场景:

1.扩展您可以增加一个组中的使用者数量,以处理该组正在使用(扩展(的主题中产生的越来越多的消息

2.故障转移通过让一组使用者阅读同一主题,他们将自动处理一个或多个使用者出现故障的情况。

因此,您不需要拥有"备用"消费者组,在那里您必须自己处理哪些是活动的,而只需要依赖Kafka内置的故障切换。消费者可以在几个不同的容器中运行(甚至在不同的数据中心中(,Kafka将自动确保将消息传递给各个消费者,无论他们在哪里,也无论在任何给定的时间有多少消息在运行。

最新更新