Kafka:从本地机器连接到以k8s运行的远程机器Kafka Broker



大家好!

主要问题是:我想通过我自己的清单从我的本地机器连接到Kafka,它在k8s容器中运行在集群上(让它是DNSnode03.st)。

zookeeper部署清单在这里(图片:confluentinc/cp-zookeeper:6.2.4):

---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: aptmess
name: zookeeper-aptmess-deployment
labels:
name: zookeeper-service-filter
spec:
selector:
matchLabels:
app: zookeeper-label
template:
metadata:
labels:
app: zookeeper-label
spec:
containers:
- name: zookeeper
image: confluentinc/cp-zookeeper:6.2.4
imagePullPolicy: IfNotPresent
ports:
- containerPort: 2181 # ZK client
name: client
- containerPort: 2888 # Follower
name: follower
- containerPort: 3888 # Election
name: election
- containerPort: 8080 # AdminServer
name: admin-server
env:
- name: ZOOKEEPER_ID
value: "1"
- name: ZOOKEEPER_SERVER_1
value: zookeeper
- name: ZOOKEEPER_CLIENT_PORT
value: "2181"
- name: ZOOKEEPER_TICK_TIME
value: "2000"
---
apiVersion: v1
kind: Service
metadata:
namespace: aptmess
name: zookeeper-service-aptmess
labels:
name: zookeeper-service-filter
spec:
type: NodePort
ports:
- port: 2181
protocol: TCP
name: client
- name: follower
port: 2888
protocol: TCP
- name: election
port: 3888
protocol: TCP
- port: 8080
protocol: TCP
name: admin-server
selector:
app: zookeeper-label

我的kafka statfulset清单(image:confluentinc/cp-kafka:6.2.4):


---
apiVersion: apps/v1
kind: StatefulSet
metadata:
namespace: aptmess
name: kafka-stateful-set-aptmess
labels:
name: kafka-service-filter
spec:
serviceName: kafka-broker
replicas: 1
podManagementPolicy: Parallel
updateStrategy:
type: RollingUpdate
selector:
matchLabels:
app: kafka-label
template:
metadata:
labels:
app: kafka-label
spec:
volumes:
- name: config
emptyDir: {}
- name: extensions
emptyDir: {}
- name: kafka-storage
persistentVolumeClaim:
claimName: kafka-data-claim
terminationGracePeriodSeconds: 300
containers:
- name: kafka
image: confluentinc/cp-kafka:6.2.4
imagePullPolicy: Always
ports:
- containerPort: 9092
resources:
requests:
memory: "2Gi"
cpu: "1"
command:
- bash
- -c
- unset KAFKA_PORT; /etc/confluent/docker/run
env:
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka-broker
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service-aptmess:2181
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "PLAINTEXT:PLAINTEXT,CONNECTIONS_FROM_HOST:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "PLAINTEXT"
- name: KAFKA_LISTENERS
value: "PLAINTEXT://0.0.0.0:9092"
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://kafka-broker.aptmess.svc.cluster.local:9092"
volumeMounts:
- name: config
mountPath: /etc/kafka
- name: extensions
mountPath: /opt/kafka/libs/extensions
- name: kafka-storage
mountPath: /var/lib/kafka/
securityContext:
runAsUser: 1000
fsGroup: 1000
---
apiVersion: v1
kind: Service
metadata:
namespace: aptmess
name: kafka-broker
labels:
name: kafka-service-filter
spec:
type: NodePort
ports:
- port: 9092
name: kafka-port
protocol: TCP
selector:
app: kafka-label

9092的NodePort为30000

当我尝试从本地主机连接时,得到错误:

from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['node03.st:30000']
)
>> Error connecting to node kafka-broker.aptmess.svc.cluster.local:9092 (id: 1 rack: null)

我花了很长时间更换内部和外部监听器,但这对我没有帮助。我应该做些什么来达到从我的本地主机发送消息到远程Kafka代理的目标?

提前感谢!

p。s:我已经搜索了这个链接来找到结果:

  • 在Strimzi Kafka的LoadBalancer上使用sram - sha -512 SSL认证
  • https://github.com/strimzi/strimzi-kafka-operator/issues/1156
  • https://github.com/strimzi/strimzi-kafka-operator/issues/1463
  • https://githubhelp.com/Yolean/kubernetes-kafka/issues/328?ysclid=l4grqi7hc6364785597
  • 从本地机器连接EC2机器上运行的Kafka
  • 在远程机器上访问kafka代理错误
  • 如何从kubernetes (minikube)中的应用程序连接到本地主机(主机)上的kafka
  • kafka代理启动时不可用
  • https://github.com/SOHU-Co/kafka-node/issues/666
  • https://docs.confluent.io/operator/current/co-nodeports.html
  • https://developers.redhat.com/blog/2019/06/07/accessing-apache-kafka-in-strimzi-part-2-node-ports
  • https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
  • Kafka在Kubernetes集群-如何发布/消费来自Kubernetes集群外部的消息
  • Kafka docker编写外部连接
  • <
  • confluentinc图像/gh>

9092端口的NodePort为30000

然后你需要定义该节点的主机名和端口作为KAFKA_ADVERTISED_LISTENERS的一部分,正如在许多链接的帖子中提到的…您只定义了一个侦听器,并且它在k8s内部…但是,请记住,这是一个糟糕的解决方案,除非您强制代理pod只在一个主机和一个端口上运行。

或者,用Strimzi操作符替换你的设置,并阅读如何使用Ingress资源(理想情况下)访问Kafka集群,但它们也支持NodePort - https://strimzi.io/blog/2019/04/17/accessing-kafka-part-1/(交叉参考最新的文档,因为那是一个旧的帖子)

Ingress的将是理想的,因为Ingress控制器将能够在拥有固定外部地址的情况下动态地将请求路由到代理pod,否则,您将经常需要使用k8s api来描述代理pod并获取它们的当前端口信息

最新更新