Kafka on Kubernetes - 客户端无法检索元数据



我有一个在 Kubernetes 上运行的 Kafka 集群,以及 Kubernetes 上的 ZooKeeper。如本答案所述,我已经为客户端配置了内部代理端口以及通告的外部端口:

listener.security.protocol.map=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
inter.broker.listener.name=PLAINTEXT
listeners=PLAINTEXT://:29092,PLAINTEXT_HOST://0.0.0.0:9093
advertised.listeners=PLAINTEXT://:29092,PLAINTEXT_HOST://{EXTERNAL-IP-ADDRESS}:9093
zookeeper.connect=zk-cs.analytics.svc:2181

我预计经纪人之间的通信将在 29092 上进行。外部客户端应该能够在端口 9093 上进行连接。 我有一个用于整个 Kubernetes 服务的外部 IP,这意味着这是应该从 Kafka 代理公开的唯一外部 IP。据我了解,Kubernetes 负载均衡器会将任何对此 IP 的请求路由到我的一个代理。

我已经验证了我的 kafka 经纪人是否正确注册了 ZooKeeper:

get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT","PLAINTEXT_HOST":"PLAINTEXT"},"endpoints":["PLAINTEXT://kafka-0.kafka-hs.analytics.svc.cluster.local:29092","PLAINTEXT_HOST://{EXTERNAL-IP-ADDRESS}"],"jmx_port":-1,"host":"kafka-0.kafka-hs.analytics.svc.cluster.local","timestamp":"1525689391350","port":29092,"version":4}
cZxid = 0x90000029f
ctime = Mon May 07 12:36:31 CEST 2018
mZxid = 0x90000029f
mtime = Mon May 07 12:36:31 CEST 2018
pZxid = 0x90000029f
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x1632acfab520009
dataLength = 344
numChildren = 0

对我来说,创建主题在日志中看起来不错,日志如下。

主要:

[2018-05-07 10:41:12,760] DEBUG [TopicChangeListener on Controller 0]: Topic change listener fired for path /brokers/topics with children test-topic (kafka.controller.PartitionStateMachine$TopicChangeListener)
[2018-05-07 10:41:12,767] INFO [TopicChangeListener on Controller 0]: New topics: [Set(test-topic)], deleted topics: [Set()], new partition replica assignment [Map([test-topic,0] -> List(0, 1))] (kafka.controller.PartitionStateMachine$TopicChangeListener)
[2018-05-07 10:41:12,768] INFO [Controller 0]: New topic creation callback for [test-topic,0] (kafka.controller.KafkaController)
[2018-05-07 10:41:12,770] INFO [Controller 0]: New partition creation callback for [test-topic,0] (kafka.controller.KafkaController)
[2018-05-07 10:41:12,771] INFO [Partition state machine on Controller 0]: Invoking state change to NewPartition for partitions [test-topic,0] (kafka.controller.PartitionStateMachine)
[2018-05-07 10:41:12,772] TRACE Controller 0 epoch 12 changed partition [test-topic,0] state from NonExistentPartition to NewPartition with assigned replicas 0,1 (state.change.logger)
[2018-05-07 10:41:12,774] INFO [Replica state machine on controller 0]: Invoking state change to NewReplica for replicas [Topic=test-topic,Partition=0,Replica=0],[Topic=test-topic,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine)
[2018-05-07 10:41:12,778] TRACE Controller 0 epoch 12 changed state of replica 0 for partition [test-topic,0] from NonExistentReplica to NewReplica (state.change.logger)
[2018-05-07 10:41:12,779] TRACE Controller 0 epoch 12 changed state of replica 1 for partition [test-topic,0] from NonExistentReplica to NewReplica (state.change.logger)
[2018-05-07 10:41:12,779] INFO [Partition state machine on Controller 0]: Invoking state change to OnlinePartition for partitions [test-topic,0] (kafka.controller.PartitionStateMachine)
[2018-05-07 10:41:12,780] DEBUG [Partition state machine on Controller 0]: Live assigned replicas for partition [test-topic,0] are: [List(0, 1)] (kafka.controller.PartitionStateMachine)
[2018-05-07 10:41:12,782] DEBUG [Partition state machine on Controller 0]: Initializing leader and isr for partition [test-topic,0] to (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12) (kafka.controller.PartitionStateMachine)
[2018-05-07 10:41:12,805] TRACE Controller 0 epoch 12 changed partition [test-topic,0] from NewPartition to OnlinePartition with leader 0 (state.change.logger)
[2018-05-07 10:41:12,806] TRACE Controller 0 epoch 12 sending become-follower LeaderAndIsr request (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12) to broker 1 for partition [test-topic,0] (state.change.logger)
[2018-05-07 10:41:12,809] TRACE Controller 0 epoch 12 sending become-leader LeaderAndIsr request (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12) to broker 0 for partition [test-topic,0] (state.change.logger)
[2018-05-07 10:41:12,810] TRACE Controller 0 epoch 12 sending UpdateMetadata request (Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12) to brokers Set(0, 1, 2, 3, 4) for partition test-topic-0 (state.change.logger)
[2018-05-07 10:41:12,811] INFO [Replica state machine on controller 0]: Invoking state change to OnlineReplica for replicas [Topic=test-topic,Partition=0,Replica=0],[Topic=test-topic,Partition=0,Replica=1] (kafka.controller.ReplicaStateMachine)
[2018-05-07 10:41:12,812] TRACE Controller 0 epoch 12 changed state of replica 0 for partition [test-topic,0] from NewReplica to OnlineReplica (state.change.logger)
[2018-05-07 10:41:12,813] TRACE Controller 0 epoch 12 changed state of replica 1 for partition [test-topic,0] from NewReplica to OnlineReplica (state.change.logger)
[2018-05-07 10:41:12,813] TRACE Broker 0 received LeaderAndIsr request PartitionState(controllerEpoch=12, leader=0, leaderEpoch=0, isr=[0, 1], zkVersion=0, replicas=[0, 1]) correlation id 5 from controller 0 epoch 12 for partition [test-topic,0] (state.change.logger)
[2018-05-07 10:41:12,813] TRACE Broker 0 received LeaderAndIsr request PartitionState(controllerEpoch=12, leader=0, leaderEpoch=0, isr=[0, 1], zkVersion=0, replicas=[0, 1]) correlation id 4 from controller 0 epoch 12 for partition [test-topic,0] (state.change.logger)
[2018-05-07 10:41:12,816] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 2 (state.change.logger)
[2018-05-07 10:41:12,817] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-2.kafka-hs.analytics.svc.cluster.local:29092 (id: 2 rack: null) (state.change.logger)
[2018-05-07 10:41:12,823] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-3.kafka-hs.analytics.svc.cluster.local:29092 (id: 3 rack: null) (state.change.logger)
[2018-05-07 10:41:12,823] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-4.kafka-hs.analytics.svc.cluster.local:29092 (id: 4 rack: null) (state.change.logger)
[2018-05-07 10:41:12,827] TRACE Broker 0 handling LeaderAndIsr request correlationId 4 from controller 0 epoch 12 starting the become-leader transition for partition test-topic-0 (state.change.logger)
[2018-05-07 10:41:12,828] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions test-topic-0 (kafka.server.ReplicaFetcherManager)
[2018-05-07 10:41:12,852] INFO Completed load of log test-topic-0 with 1 log segments and log end offset 0 in 17 ms (kafka.log.Log)
[2018-05-07 10:41:12,853] INFO Created log for partition [test-topic,0] in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 0.10.2-IV0, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2018-05-07 10:41:12,853] INFO Partition [test-topic,0] on broker 0: No checkpointed highwatermark is found for partition test-topic-0 (kafka.cluster.Partition)
[2018-05-07 10:41:12,861] TRACE Broker 0 stopped fetchers as part of become-leader request from controller 0 epoch 12 with correlation id 4 for partition test-topic-0 (state.change.logger)
[2018-05-07 10:41:12,861] TRACE Broker 0 completed LeaderAndIsr request correlationId 4 from controller 0 epoch 12 for the become-leader transition for partition test-topic-0 (state.change.logger)
[2018-05-07 10:41:12,864] WARN Broker 0 ignoring LeaderAndIsr request from controller 0 with correlation id 5 epoch 12 for partition [test-topic,0] since its associated leader epoch 0 is not higher than the current leader epoch 0 (state.change.logger)
[2018-05-07 10:41:12,865] TRACE Controller 0 epoch 12 received response {error_code=0,partitions=[{topic=test-topic,partition=0,error_code=11}]} for a request sent to broker kafka-0.kafka-hs.analytics.svc.cluster.local:29092 (id: 0 rack: null) (state.change.logger)
[2018-05-07 10:41:12,865] TRACE Controller 0 epoch 12 received response {error_code=0,partitions=[{topic=test-topic,partition=0,error_code=0}]} for a request sent to broker kafka-1.kafka-hs.analytics.svc.cluster.local:29092 (id: 1 rack: null) (state.change.logger)
[2018-05-07 10:41:12,867] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 6 (state.change.logger)
[2018-05-07 10:41:12,867] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-0.kafka-hs.analytics.svc.cluster.local:29092 (id: 0 rack: null) (state.change.logger)
[2018-05-07 10:41:12,867] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 5 (state.change.logger)
[2018-05-07 10:41:12,868] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-1.kafka-hs.analytics.svc.cluster.local:29092 (id: 1 rack: null) (state.change.logger)
[2018-05-07 10:41:26,213] INFO Partition [test-topic,0] on broker 0: Shrinking ISR for partition [test-topic,0] from 0,1 to 0 (kafka.cluster.Partition)
[2018-05-07 10:41:28,721] DEBUG [IsrChangeNotificationListener on Controller 0]: ISR change notification listener fired (kafka.controller.IsrChangeNotificationListener)
[2018-05-07 10:41:28,735] DEBUG [IsrChangeNotificationListener on Controller 0]: Sending MetadataRequest to Brokers:ArrayBuffer(0, 1, 2, 3, 4) for TopicAndPartitions:Set([test-topic,0], [__consumer_offsets,30], [__consumer_offsets,6]) (kafka.controller.IsrChangeNotificationListener)
[2018-05-07 10:41:28,735] INFO Leader not yet assigned for partition [__consumer_offsets,30]. Skip sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch)
[2018-05-07 10:41:28,735] INFO Leader not yet assigned for partition [__consumer_offsets,6]. Skip sending UpdateMetadataRequest. (kafka.controller.ControllerBrokerRequestBatch)
[2018-05-07 10:41:28,735] TRACE Controller 0 epoch 12 sending UpdateMetadata request (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:12) to brokers Set(0, 1, 2, 3, 4) for partition test-topic-0 (state.change.logger)
[2018-05-07 10:41:28,739] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 6 (state.change.logger)
[2018-05-07 10:41:28,739] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 3 (state.change.logger)
[2018-05-07 10:41:28,739] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-1.kafka-hs.analytics.svc.cluster.local:29092 (id: 1 rack: null) (state.change.logger)
[2018-05-07 10:41:28,740] TRACE Broker 0 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 7 (state.change.logger)
[2018-05-07 10:41:28,740] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-2.kafka-hs.analytics.svc.cluster.local:29092 (id: 2 rack: null) (state.change.logger)
[2018-05-07 10:41:28,740] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-3.kafka-hs.analytics.svc.cluster.local:29092 (id: 3 rack: null) (state.change.logger)
[2018-05-07 10:41:28,740] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-0.kafka-hs.analytics.svc.cluster.local:29092 (id: 0 rack: null) (state.change.logger)
[2018-05-07 10:41:28,741] TRACE Controller 0 epoch 12 received response {error_code=0} for a request sent to broker kafka-4.kafka-hs.analytics.svc.cluster.local:29092 (id: 4 rack: null) (state.change.logger)
[2018-05-07 10:41:28,746] DEBUG [IsrChangeNotificationListener on Controller 0]: ISR change notification listener fired (kafka.controller.IsrChangeNotificationListener)
[2018-05-07 10:41:36,297] TRACE [Controller 0]: checking need to trigger partition rebalance (kafka.controller.KafkaController)
[2018-05-07 10:41:36,298] DEBUG [Controller 0]: preferred replicas by broker Map(0 -> Map([test-topic,0] -> List(0, 1))) (kafka.controller.KafkaController)
[2018-05-07 10:41:36,302] DEBUG [Controller 0]: topics not in preferred replica Map() (kafka.controller.KafkaController)
[2018-05-07 10:41:36,303] TRACE [Controller 0]: leader imbalance ratio for broker 0 is 0.000000 (kafka.controller.KafkaController)

副本 #1:

[2018-05-07 10:41:12,822] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 3 (state.change.logger)
[2018-05-07 10:41:28,739] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 4 (state.change.logger)

副本 #2:

[2018-05-07 10:41:12,823] TRACE Broker 2 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,1,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 1 (state.change.logger)
[2018-05-07 10:41:28,740] TRACE Broker 2 cached leader info (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:12),ReplicationFactor:2),AllReplicas:0,1) for partition test-topic-0 in response to UpdateMetadata request sent by controller 0 epoch 12 with correlation id 2 (state.change.logger)

但是,每当我将控制台创建者连接到群集时,都会收到以下错误

.kafka-console-producer.bat --broker-list {EXTERNAL-IP-ADDRESS}:9093 --topic test-topic --property parse.key=true --property key.separator=:
>testKey:23487239847237894asduhzdfhzusfhhsdf
[2018-05-07 12:42:58,395] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 2 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-05-07 12:42:58,512] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-05-07 12:42:58,641] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-05-07 12:42:58,765] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 5 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
[2018-05-07 12:42:58,886] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 6 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

Kubernetes 服务通常公开一个外部 IP 地址,而我所有的 Kafka 代理都在宣传这个 IP 是一个问题吗?有解决方案吗?

我希望经纪人之间的通信发生在 29092 上。

是的,他们使用29092进行内部沟通。

外部客户端应该能够在端口 9093 上进行连接。我有一个用于整个 Kubernetes 服务的外部 IP,这意味着这是应该从 Kafka 代理公开的唯一外部 IP。据我了解,Kubernetes 负载均衡器会将任何对此 IP 的请求路由到我的一个代理。

是的,Kubernetes 会将该服务的所有流量路由到您的代理之一,这是一个问题。

在内部,您使用无头服务来发现 Kafka 代理的地址,因此它们可以通过 DNS 名称kafka-[_NUM_OF_THE_REPLICA_]._SERVICE_NAME_使用,并且可以正常工作。

要从群集外部进行访问,您需要在不同的地址或端口上公开所有副本。但是,您只有一个服务可以平衡服务之间的请求。

要修复此问题,您应该为每个副本创建一个单独的服务,并使用主题外部地址作为配置中的EXTERNAL-IP-ADDRESSES

以下是 GitHub 存储库中问题的示例,您可以在其中获得适用于 Kubernetes 的 Kafka 集群配置:

---
apiVersion: v1
kind: Service
metadata:
name: kafka-es-0
spec:
ports:
- port: 9092
name: kafka-port
protocol: TCP
selector:
pod-name: kafka-0
type: LoadBalancer
---
apiVersion: v1
kind: Service
metadata:
name: kafka-es-1
spec:
ports:
- port: 9092
name: kafka-port
protocol: TCP
selector:
pod-name: kafka-1
type: LoadBalancer

最新更新