代理关闭时,节点kafka消费者组未接收到消息



我在卡夫卡有一个主题,名为internal。我使用以下命令创建了主题

/opt/kafka/bin/kafka-topics.sh 
--create --zookeeper zookeeper:2181 
--replication-factor 3 -partitions 6 
--topic internal

我需要在三个不同的节点服务器中使用所有消息。所以我使用kafka节点模块作为具有不同消费者名称的消费者组。我已经创建了一个名为group1group2group3的使用者组名称。

一切正常,我可以消费所有消费者的所有信息。

但当任何一个经纪人破产时,消费者都不会得到任何信息。当我列出所有消费者组时,它不会显示特定的组ID。

(例如)如果nodeserver 1关闭,则在名为group1的代理中没有可用的组

即使我重新启动节点服务器,它也不会在broker中创建任何组,也不会在相应的节点服务器中使用任何消息。但当代理启动,节点服务器重新启动时,它正在代理中创建一个组,节点服务器可以接收消息。

consumer.js

const options = {
kafkaHost: process.env.KAFKA_HOST, 
groupId: group_id, //group1 (or) group2 (or) group3
autoCommit: true,
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'latest',
outOfRangeOffset: 'earliest',
migrateHLC: false,
migrateRolling: true,
fetchMaxBytes: 1024 * 1024 * 10,
commitOffsetsOnFirstJoin: true,
onRebalance: (isAlreadyMember, callback) => {
log.info({"ALREADY_MEMBER_isAlreadyMember": isAlreadyMember});
callback();
}
};
const consumerGroup = new ConsumerGroup(options, process.env.KAFKA_TOPIC);
// On receiving message
consumerGroup.on("message", handMessage); //handMessage is where the message has been handled
// On error receiving message
consumerGroup.on('error', function(err) {
log.debug({"type": "KAFKA_CONSUMER_ERROR", "msg": err});
});
// On error receiving message
consumerGroup.on('offsetOutOfRange', function(err) {
log.debug({"type": "KAFKA_CONSUMER_RANGE_ERROR", "msg": err});
});

更新-1

即使我将offsets.topic.replication.factor更新为23,我也会遇到同样的问题。当任意代理关闭时,相应的节点服务器不会使用该消息。而且,当我在broker中显示组列表时,它只显示group2group3。但是当CCD_ 14掉电时,CCD_。即使重新启动节点使用者,也不会创建group1

server.properties

broker.id=1
listeners=INSIDE://:9092,OUTSIDE://:9094
advertised.listeners=INSIDE://:9092,OUTSIDE://:9094
listener.security.protocol.map=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/kafka/kafka-logs-d3f14c9ddf0a
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper1:2181,zookeeper2:2181,zookeeper3:2181
zookeeper.connection.timeout.ms=16000
group.initial.rebalance.delay.ms=0
inter.broker.listener.name=INSIDE
advertised.port=9094
port=9092
auto.create.topics.enable=false

更新-2

当代理关闭时,组协调器将被删除,并且不会自动重新选择。

你们能告诉我我做错了什么吗?或者我还有什么需要更新的吗?

即使我将offsets.topic.replication.factor更新为2或3,我也会遇到同样的问题。当任何代理程序关闭时,相应的节点服务器不会消耗消息

创建偏移量主题后,更改此属性不会有任何作用。

如果设置为1,那么您现在需要手动增加

假设这至少是Kafka 1.x,则需要对internalKafka主题的HA进行一些更改。考虑server.properties中的以下片段。复制的默认值设置为1。在您的情况下,对于3个经纪人,将其设置为2可能是一个很好的开始。

# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1

添加

据我所知,每个消费者群体都有自己的群体协调员。因此,如果有多个组从一个主题消费,则可以为该主题有多个协调员(不同的代理)。代理可以充当多个消费者组的group coordinator。但对于一个消费者群体来说,只有一个经纪人充当协调人。对于特定的消费者群体,我们可以使用以下命令检查哪个经纪人是协调人:

./kafka-consumer-groups.sh --bootstrap-server <broker-host>:9092 --describe --group <consumer-group> --state 

如果协调器失败,则会选择其他某个代理作为协调器。故障切换策略将在第10节中详细说明。

最新更新