Kafka Connect在一个代理节点宕机后失败



我有一个zookeeper, 2个集群代理和2个集群连接,集群连接工作良好,完全覆盖彼此。但是当其中一个代理节点发生故障时,连接器将打印无法连接到代理并且连接器无法正常工作。我检查了连接器主题,但问题发生后没有推它。

version: '2'
volumes:
zoo-data:
zoo-log:
broker-data:
broker-log:  
broker2-data: 
broker2-log:  
connecotr-data:
connecotr2-data:  
services:
zookeeper:
image: confluentinc/cp-zookeeper:6.2.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"      
volumes:
- zoo-data:/var/lib/zookeeper/data
- zoo-log:/var/lib/zookeeper/log
environment:
#      ZOOKEEPER_CLIENT_PORT: 22181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Dzookeeper.4lw.commands.whitelist=*"
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_INIT_LIMIT: 5
ZOOKEEPER_SYNC_LIMIT: 2
#      ZOOKEEPER_SERVERS: zookeeper:2888:3888      
broker:
image: confluentinc/cp-server:6.2.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
volumes:
- broker-data:/var/lib/kafka/data
- broker-log:/var/lib/kafka/log
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://:9092 
DELETE_TOPIC_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
BOOTSTRAP_SERVERS: broker2:29094,broker:29092
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081

broker2:
image: confluentinc/cp-server:6.2.0
hostname: broker2
container_name: broker2
depends_on:
- zookeeper
ports:
- "9094:9094"
- "9103:9103"
volumes:
- broker2-data:/var/lib/kafka/data
- broker2-log:/var/lib/kafka/log
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker2:29094,PLAINTEXT_HOST://localhost:9094
KAFKA_LISTENERS: PLAINTEXT://broker2:29094,PLAINTEXT_HOST://:9094         
DELETE_TOPIC_ENABLE: 'true'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 2
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 2
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 2
BOOTSTRAP_SERVERS: broker2:29094,broker:29092
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
#    BOOTSTRAP_SERVERS: broker2:29094,broker:29092

kafdrop:
image: obsidiandynamics/kafdrop:latest
container_name: kafdrop
ports:
- 9000:9000
environment:
- KAFKA_BROKERCONNECT=broker:29092,broker2:29094
depends_on: 
- broker
schema-registry:
image: confluentinc/cp-schema-registry:6.2.0
hostname: schema-registry
container_name: schema-registry
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092,broker2:29094'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
#    image: confluentinc/cp-kafka-connect:5.4.0
hostname: connect
container_name: connect
depends_on:     
- schema-registry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092,broker2:29094'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-mysql
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect=DEBUG"
# CLASSPATH required due to CC-2422
# CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
#   CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
#  CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: '/usr/share/java'
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR    
volumes:
- connecotr-data:/etc/docker/volumnes/kafka/connector/

connect2:
image: cnfldemos/cp-server-connect-datagen:0.5.0-6.2.0
#    image: confluentinc/cp-kafka-connect:5.4.0
hostname: connect2
container_name: connect2
depends_on:     
- schema-registry
ports:
- "18083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker2:29094,broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect2
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-mysql
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_LOG4J_LOGGERS: DEBUG
# CLASSPATH required due to CC-2422
# CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-6.2.0.jar
#   CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
#  CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: '/usr/share/java'
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR    
volumes:
- connecotr2-data:/etc/docker/volumnes/kafka/connector/

您的Connect主题复制因素只有一个。

CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-storage  # renamed - it is not only for mysql
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

如果承载这些主题的代理崩溃,那么您应该期望Connect也崩溃/停止工作

最新更新