如何让我的 Kafka 代理相互通信?



我正在尝试创建两个 kafka docker 镜像来相互通信,但第一个站起来的 kafka 代理找不到第二个 kafka 代理。这是我的docker-compose.yml:

version: '3.8'
services:
zookeeper:
image: "bitnami/zookeeper:latest"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
tmpfs: "/datalog"
kafka-1:
image: "bitnami/kafka:latest"
container_name: "kafka-1"
ports:
- "9094:9094"
depends_on:
- zookeeper
restart: always
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=PLAINTEXT://:9094
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9094
- KAFKA_DEFAULT_REPLICATION_FACTOR=2
- KAFKA_NUM_PARTITIONS=2
- ALLOW_PLAINTEXT_LISTENER=yes
kafka-2:
image: "bitnami/kafka:latest"
container_name: "kafka-2"
ports:
- "9095:9095"
depends_on:
- zookeeper
restart: always
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_LISTENERS=PLAINTEXT://:9095
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9095
- KAFKA_DEFAULT_REPLICATION_FACTOR=2
- KAFKA_NUM_PARTITIONS=2
- ALLOW_PLAINTEXT_LISTENER=yes

Kafka 容器反复打印出来:

kafka-2                      | [[2022-10-24 15:41:47,131] INFO [Controller id=2, targetBrokerId=1] Client requested connection close from node 1 (org.apache.kafka.clients.NetworkClient)
kafka-2                      | [2022-10-24 15:41:47,232] INFO [Controller id=2, targetBrokerId=1] Node 1 disconnected. (org.apache.kafka.clients.NetworkClient)
kafka-2                      | [2022-10-24 15:41:47,232] WARN [Controller id=2, targetBrokerId=1] Connection to node 1 (/127.0.0.1:9094) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
kafka-2                      | [2022-10-24 15:41:47,232] WARN [RequestSendThread controllerId=2] Controller 2's connection to broker 127.0.0.1:9094 (id: 1 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
kafka-2                      | java.io.IOException: Connection to 127.0.0.1:9094 (id: 1 rack: null) failed.
kafka-2                      |  at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
kafka-2                      |  at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:291)
kafka-2                      |  at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:245)
kafka-2                      |  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

当我尝试使用 kafkajs 发送消息时,我收到错误消息:KafkaJSNonRetriableError: Replication-factor is invalid

编辑:

使用以下建议:

version: '3.8'
services:
dynamodb-local-express-bp:
command: "-jar DynamoDBLocal.jar -sharedDb -dbPath ./data"
image: "amazon/dynamodb-local:latest"
container_name: dynamodb-local-express-bp
ports:
- "8000:8000"
volumes:
- "./docker/dynamodb:/home/dynamodblocal/data"
working_dir: /home/dynamodblocal
zookeeper:
image: "bitnami/zookeeper:latest"
ports:
- "2181:2181"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
tmpfs: "/datalog"
kafka-1:
image: "bitnami/kafka:latest"
container_name: "kafka-1"
expose:
- 9092
ports:
- "9094:9094"
depends_on:
- zookeeper
restart: always
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9094,INTERNAL://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9094,INTERNAL://kafka-1:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=2
- KAFKA_CFG_NUM_PARTITIONS=2
- ALLOW_PLAINTEXT_LISTENER=yes
kafka-2:
image: "bitnami/kafka:latest"
container_name: "kafka-2"
ports:
- "9095:9095"
depends_on:
- zookeeper
restart: always
environment:
- KAFKA_BROKER_ID=2
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9095,INTERNAL://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9095,INTERNAL://kafka-1:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=2
- KAFKA_CFG_NUM_PARTITIONS=2
- ALLOW_PLAINTEXT_LISTENER=yes
# - KAFKA_BROKER_ID=2
# - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# - KAFKA_LISTENERS=PLAINTEXT://:9095
# - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9095
# - KAFKA_DEFAULT_REPLICATION_FACTOR=2
# - KAFKA_NUM_PARTITIONS=2
# - ALLOW_PLAINTEXT_LISTENER=yes

我收到以下错误消息:

[2022-10-24 18:59:06,286] ERROR [KafkaServer id=2] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
kafka-2                      | java.lang.IllegalArgumentException: requirement failed: Configured end points kafka-1:9092 in advertised listeners are already registered by broker 1
kafka-2                      |  at scala.Predef$.require(Predef.scala:281)
kafka-2                      |  at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3(KafkaServer.scala:512)
kafka-2                      |  at kafka.server.KafkaServer.$anonfun$createBrokerInfo$3$adapted(KafkaServer.scala:510)
kafka-2                      |  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
kafka-2                      |  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
kafka-2                      |  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
kafka-2                      |  at kafka.server.KafkaServer.createBrokerInfo(KafkaServer.scala:510)
kafka-2                      |  at kafka.server.KafkaServer.startup(KafkaServer.scala:331)
kafka-2                      |  at kafka.Kafka$.main(Kafka.scala:109)
kafka-2                      |  at kafka.Kafka.main(Kafka.scala)

该问题是由 docker 为您创建的网桥引起的。

这是一篇很棒的文章,发生了什么以及 Kafka 经纪人究竟是如何发现彼此的。

您需要一个额外的播发侦听器。127.0.0.1是播发到主机的那个。您需要一个额外的 docker 网络内部。

将这些配置添加/更改到您的 dockerfile:

kafka-1:
...
expose: 
- 9092
environment:
- KAFKA_BROKER_ID=1
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9094,INTERNAL://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9094,INTERNAL://kafka-1:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
- KAFKA_CFG_DEFAULT_REPLICATION_FACTOR=2
- KAFKA_CFG_NUM_PARTITIONS=2
- ALLOW_PLAINTEXT_LISTENER=yes

kafka-2类比

此外,您正在使用 bitnami 映像,它们在所有 Kafka 环境变量前面加上KAFKA_CFG.这也是一个陷阱

相关内容

  • 没有找到相关文章

最新更新