我在多节点环境中使用kafka来测试故障切换的工作方式。实际上,我有两个虚拟机,每个虚拟机中有一个kafka节点,而两个虚拟中的一个虚拟机中只有一个zookeeper。我知道没有最佳的生产配置,但这只是为了训练自己,更好地理解事情。
以下是我的配置:VM1 ip:192.168.64.2(只有一个broker.id=2(VM2 ip:192.168.64.3(zookeeper在这里运行,broker与broker.id=1(
我通过podman启动kafka(这不是podman的问题,一切都配置好了(
在VM1:上
podman run -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT=192.168.64.3:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9093,PLAINTEXT_HOST://192.168.64.2:29092 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT_HOST -e UNCLEAN_LEADER_ELECTION_ENABLE=true --pod zookeeper-kafka confluentinc/cp-kafka:latest
在VM2:上
podman run -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092,PLAINTEXT_HOST://192.168.64.3:29092 -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT_HOST -e UNCLEAN_LEADER_ELECTION_ENABLE=true --pod zookeeper-kafka confluentinc/cp-kafka:latest
现在我创建一个主题"订单":
./kafktopics --create --bootstrap-server 192.168.64.2:29092,192.168.64.3:29092 --replication-factor 2 --partitions 2 --topic orders
然后我创建了一个生产者:
./kafkconsole-producer --broker-list 192.168.64.2:29092,192.168.64.3:29092 --topic orders
消费者:
./kafka-console-consumer --bootstrap-server 192.168.64.2:29092,192.168.64.3:29092 --topic orders```
以下是我尝试做的事情:
- 启动Zookeeper,2个kafka节点,创建;订单;主题、生产者和消费者(好吧,一切都很好(
- 在我的生产者中发送消息,并检查消费者是否收到消息(OK(
- 杀死VM2上的kafka节点(OK(
- 在我的生产者中再次发送消息,并检查消费者是否收到消息(好吧,VM1上的代理可以分发消息(
- 重新启动VM2上被杀死的kafka节点(好的。之后我可以看到这两个分区都有VM1作为引导(
- 在我的生产者中再次发送消息,并检查消费者接收id(OK(
- 杀死VM1上的kafka节点,它现在是2个分区的领导者(OK(
- 在我的生产者中再次发送消息,并检查消费者是否收到消息(好吧,VM2上的代理可以分发消息(
- 重新启动VM1上被杀死的kafka节点(好的。之后我可以看到这两个分区都有VM2作为引导(
- 在我的生产者中再次发送消息,并检查消费者是否收到(OK(
- 再次杀死VM2上的kafka节点(OK(
- 在我的生产者中再次发送消息,并检查消费者是否收到(不正常(:在这里,生产者无法发送消息,而我的消费者从未收到消息!过了几段时间,我的制作人出现了一个错误:
ERROR Error when sending message to topic orders with key: null, value: 9 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for orders-0:120000 ms has passed since batch creation
我真的不明白这里发生了什么?它一开始工作得很好,但在启动/停止/启动broker后,它开始失败!我需要准确地说,我永远不会在同一时间杀死2个经纪人。
你能解释一下我在这里缺了什么吗?
谢谢大家:(
编辑
完成以下评论:
@OneCricketer,我把你评论的答案放在这里。
启动时一切正常:
Topic: orders TopicId: I3hMNln9TpSuo76xHSpMXQ PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: orders Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: orders Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
杀死VM2:后
Topic: orders TopicId: I3hMNln9TpSuo76xHSpMXQ PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: orders Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2
Topic: orders Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2
杀死VM1:后
Topic: orders TopicId: I3hMNln9TpSuo76xHSpMXQ PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: orders Partition: 0 Leader: 1 Replicas: 2,1 Isr: 1
Topic: orders Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1
杀死VM2:后
Topic: orders TopicId: I3hMNln9TpSuo76xHSpMXQ PartitionCount: 2 ReplicationFactor: 2 Configs:
Topic: orders Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2
Topic: orders Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2
(从这里开始,生产者不能再发布消息了(
经过长时间的阅读和调查,我终于找到了问题的答案。
只有2个代理,我需要以下配置
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2
KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS=1
问题是偏移主题的默认分区数。(如果我记得很清楚的话,是50分中的49分(。
现在只有一个分区和两个副本,一切都很好,我可以启动/停止/启动/停止。。。。我的经纪人尽可能多的时间,而另一个经纪人带头并继续处理我的消息。
希望这能在未来帮助到别人。