无法在Docker(Docker Compose)中使用ACL连接到Kafka



我正在尝试在docker(docker-compose(中设置带有ACL的kafka。我无法连接到我的python示例脚本。我正在使用dicrectcsd 的脚本

docker-compose.yml

zookeeper:
image: confluentinc/cp-zookeeper:5.4.1
container_name: zookeeper
ports:
- '31000:31000'
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
KAFKA_OPTS: "-Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Djava.security.auth.login.config=/opt/security/zookeeper-server.jaas"
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31000
volumes:
- ./kafka/data/zookeeper/data:/data
- ./kafka/data/zookeeper/datalog:/datalog
- ./kafka/data/kafka1/security:/opt/security
networks:
- proxy
kafka:
image: confluentinc/cp-server:5.4.1
container_name: kafka
ports:
- '9092:9092'
- '9093:9093'
- '31001:31001'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: EXTERNAL
KAFKA_LISTENERS: "EXTERNAL://:9092,INTERNAL://kafka:9093"
KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://:9092,INTERNAL://kafka:9093"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "EXTERNAL:SASL_PLAINTEXT,INTERNAL:PLAINTEXT"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
#      KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_LISTENER_NAME_EXTERNAL_SASL_ENABLED_MECHANISMS: PLAIN
#      KAFKA_LISTENER_NAME_INTERNAL_SASL_ENABLED_MECHANISMS: PLAIN
#      KAFKA_LISTENER_NAME_EXTERNAL_PLAIN_SASL_JAAS_CONFIG: "KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret' user_admin='admin-secret';} KafkaClient {org.>
#      KAFKA_LISTENER_NAME_INTERNAL_PLAIN_SASL_JAAS_CONFIG: "KafkaServer {org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret' user_admin='admin-secret';} KafkaClient {org.>
#      KAFKA_ZOOKEEPER_SASL_CLIENTCONFIG: "org.apache.zookeeper.server.auth.DigestLoginModule required username='admin' password='admin-secret';};"
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer
#      KAFKA_AUTHORIZER_CLASS_NAME: io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
#      KAFKA_CONFLUENT_AUTHORIZER_ACCESS_RULE_PROVIDERS: "ZK_ACL,CONFLUENT"
KAFKA_SUPER_USERS: "User:admin"
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "false"
KAFKA_ZOOKEEPER_SET_ACL: "true"
KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/security/kafka-server.jaas"
KAFKA_JMX_HOSTNAME: "localhost"
KAFKA_JMX_PORT: 31001
#KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
#CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: localhost:9093
#CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
#CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
#CONFLUENT_METRICS_REPORTER_SECURITY_PROTOCOL: SASL_PLAINTEXT
#CONFLUENT_METRICS_REPORTER_SASL_MECHANISM: PLAIN
#CONFLUENT_METRICS_REPORTER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin-secret';"
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./kafka/data/kafka1/data:/var/lib/kafka/data
- ./kafka/data/kafka1/security:/opt/security
networks:
- proxy

kafka-server.jaas

// Server config - used to authorise
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_producer="producer-secret"
user_consumer="consumer-secret";
};
// Client config used to connect to Kafka
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
// Client config user to connect to Zookeeper
Client {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret";
};

zookeeper-server.jaas

Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};

动物园管理员日志

[2021-12-26 14:14:15,347] INFO adding SASL authorization for authorizationID: admin (org.apache.zookeeper.server.ZooKeeperServer)
[2021-12-26 14:14:18,418] INFO Successfully authenticated client: authenticationID=admin;  authorizationID=admin. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:18,418] INFO Setting authorizedID: admin (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:18,418] INFO adding SASL authorization for authorizationID: admin (org.apache.zookeeper.server.ZooKeeperServer)
[2021-12-26 14:14:20,775] INFO Successfully authenticated client: authenticationID=admin;  authorizationID=admin. (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:20,775] INFO Setting authorizedID: admin (org.apache.zookeeper.server.auth.SaslServerCallbackHandler)
[2021-12-26 14:14:20,775] INFO adding SASL authorization for authorizationID: admin (org.apache.zookeeper.server.ZooKeeperServer)

Kafka日志

[2021-12-26 14:16:18,822] INFO [KafkaApi-1] Auto creation of topic first_topic with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-12-26 14:16:18,823] INFO [Controller id=1] New topics: [Set(first_topic)], deleted topics: [Set()], new partition replica assignment [Set(TopicIdReplicaAssignment(first_topic,None,Map(first_topic-0 -> ReplicaAssignment(replicas=1, addingReplicas=, removingReplicas=, observers=, targetObservers=None))))] (kafka.controller.KafkaController)
[2021-12-26 14:16:18,823] INFO [Controller id=1] New partition creation callback for first_topic-0 (kafka.controller.KafkaController)
[2021-12-26 14:16:18,824] TRACE [Controller id=1 epoch=1] Changed partition first_topic-0 state from NonExistentPartition to NewPartition with assigned replicas 1 (state.change.logger)
[2021-12-26 14:16:18,824] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition first_topic-0 from NonExistentReplica to NewReplica (state.change.logger)
[2021-12-26 14:16:18,835] TRACE [Controller id=1 epoch=1] Changed partition first_topic-0 from NewPartition to OnlinePartition with state LeaderAndIsr(leader=1, leaderEpoch=0, isr=List(1), zkVersion=0) (state.change.logger)
[2021-12-26 14:16:18,835] TRACE [Controller id=1 epoch=1] Sending become-leader LeaderAndIsr request LeaderAndIsrPartitionState(topicName='first_topic', topicId=00000000-0000-0000-0000-000000000000, partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], addingReplicas=[], removingReplicas=[], isNew=true) to broker 1 for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,836] TRACE [Controller id=1 epoch=1] Sending UpdateMetadata request UpdateMetadataPartitionState(topicName='first_topic', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], offlineReplicas=[]) to brokers Set(1) for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,836] TRACE [Controller id=1 epoch=1] Changed state of replica 1 for partition first_topic-0 from NewReplica to OnlineReplica (state.change.logger)
[2021-12-26 14:16:18,837] TRACE [Broker id=1] Received LeaderAndIsr request LeaderAndIsrPartitionState(topicName='first_topic', topicId=00000000-0000-0000-0000-000000000000, partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], addingReplicas=[], removingReplicas=[], isNew=true) correlation id 5 from controller 1 epoch 1 (state.change.logger)
[2021-12-26 14:16:18,838] TRACE [Broker id=1] Handling LeaderAndIsr request correlationId 5 from controller 1 epoch 1 starting the become-leader transition for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,838] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(first_topic-0) (kafka.server.ReplicaFetcherManager)
[2021-12-26 14:16:18,842] INFO [Log partition=first_topic-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-26 14:16:18,842] INFO [Log partition=first_topic-0, dir=/var/lib/kafka/data] Completed load of log with 1 segments, log start offset (merged: 0, local: 0) and log end offset 0 in 2 ms (kafka.log.Log)
[2021-12-26 14:16:18,842] INFO [Log partition=first_topic-0, dir=/var/lib/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-12-26 14:16:18,843] INFO Completed load of log with 1 segments containing 1 local segments and 0 tiered segments, tier start offset 0, first untiered offset 0, local start offset 0, log end offset 0 (kafka.log.MergedLog)
[2021-12-26 14:16:18,844] INFO Created log for partition first_topic-0 in /var/lib/kafka/data/first_topic-0 with properties {compression.type -> producer, message.downconversion.enable -> true, confluent.missing.id.cache.ttl.sec -> 60, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, confluent.tier.local.hotset.ms -> 86400000, confluent.tier.local.hotset.bytes -> -1, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, confluent.append.record.interceptor.classes -> [], confluent.tier.enable -> false, message.format.version -> 2.4-IV1, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, confluent.schema.registry.max.cache.size -> 10000, unclean.leader.election.enable -> false, confluent.missing.id.query.range -> 200, retention.bytes -> -1, delete.retention.ms -> 86400000, confluent.schema.registry.max.retries -> 1, segment.ms -> 604800000, confluent.schema.registry.retries.wait.ms -> 0, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-12-26 14:16:18,845] INFO [Partition first_topic-0 broker=1] No checkpointed highwatermark is found for partition first_topic-0 (kafka.cluster.Partition)
[2021-12-26 14:16:18,845] INFO [Partition first_topic-0 broker=1] Log loaded for partition first_topic-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-12-26 14:16:18,845] INFO [Partition first_topic-0 broker=1] first_topic-0 starts at leader epoch 0 from offset 0 with high watermark 0. Previous leader epoch was -1. (kafka.cluster.Partition)
[2021-12-26 14:16:18,851] TRACE [Broker id=1] Stopped fetchers as part of become-leader request from controller 1 epoch 1 with correlation id 5 for partition first_topic-0 (last update controller epoch 1) (state.change.logger)
[2021-12-26 14:16:18,851] TRACE [Broker id=1] Completed LeaderAndIsr request correlationId 5 from controller 1 epoch 1 for the become-leader transition for partition first_topic-0 (state.change.logger)
[2021-12-26 14:16:18,852] TRACE [Controller id=1 epoch=1] Received response {error_code=0,partition_errors=[{topic_name=first_topic,partition_index=0,error_code=0,_tagged_fields={}}],_tagged_fields={}} for request LEADER_AND_ISR with correlation id 5 sent to broker 6e48df004a23:9092 (id: 1 rack: null) (state.change.logger)
[2021-12-26 14:16:18,853] TRACE [Broker id=1] Cached leader info UpdateMetadataPartitionState(topicName='first_topic', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], observers=[], offlineReplicas=[]) for partition first_topic-0 in response to UpdateMetadata request sent by controller 1 epoch 1 with correlation id 6 (state.change.logger)
[2021-12-26 14:16:18,854] TRACE [Controller id=1 epoch=1] Received response {error_code=0,_tagged_fields={}} for request UPDATE_METADATA with correlation id 6 sent to broker 6e48df004a23:9092 (id: 1 rack: null) (state.change.logger)

Python生产者示例脚本

from kafka import KafkaProducer
topic = "first_topic"
sasl_mechanism = "PLAIN"
username = "admin"
password = "admin-secret"
security_protocol = "SASL_PLAINTEXT"
bootstrap_servers = ['kafka.***:9092']
producer = KafkaProducer(bootstrap_servers = bootstrap_servers,
#api_version=(0, 10),
security_protocol=security_protocol,
#ssl_context=context,
#ssl_check_hostname=True,
#ssl_cafile='../keys/CARoot.pem',
sasl_mechanism = sasl_mechanism,
sasl_plain_username = username,
sasl_plain_password = password)
#ssl_certfile='../keys/certificate.pem',
#ssl_keyfile='../keys/key.pem')#,api_version = (0, 10))
#producer = KafkaProducer()
future = producer.send('first_topic', b'another_message again')
result = future.get(timeout=60)
print(result)

python脚本错误消息:

Exception has occurred: KafkaTimeoutError
KafkaTimeoutError: Batch for TopicPartition(topic='first_topic', partition=0) containing 1 record(s) expired: 31 seconds have passed since batch creation plus linger time
File "C:gitkafka_testproducer_acl.py", line 22, in <module>
result = future.get(timeout=60)

请你帮我,给我指明解决这个问题的正确方向。

非常感谢

Simon

事实证明,您实际上可以访问Kafka,但只能在docker内部使用kafka:9092

原因是KAFKA_ADVERTISED_LISTENERS必须设置为外部ip地址/域。点击此处了解更多信息:https://rmoff.net/2018/08/02/kafka-listeners-explained/和https://www.confluent.io/blog/kafka-listeners-explained/

LISTENERS是Kafka绑定的接口。ADVERTISED_LISTENERS是客户端连接的方式。

这是我的docker-compose.yml

注意:用您的ip地址/域名替换下面的your.ip.addr

...
...
kafka:
image: confluentinc/cp-server:5.4.1
ports:
- '9092:9092'
- '9093:9093'
- '9094:9094'
- '31001:31001'
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: EXTERNAL
KAFKA_LISTENERS: "EXTERNAL://kafka:9092,INTERNAL://localhost:9093,OUTSIDE://:9094"
KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://kafka:9092,INTERNAL://localhost:9093,OUTSIDE://your.ip.addr:9094"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "EXTERNAL:SASL_PLAINTEXT,INTERNAL:SASL_PLAINTEXT,OUTSIDE:SASL_PLAINTEXT"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
...
...

最新更新