我在Google Cloud上有一个3个实例的kafka集群。
我想在另一个外部服务器上通过python文件配置消费者。
下面是python代码:from kafka import KafkaConsumer
from json import loads
if __name__ == "__main__":
topic_name = "test"
consumer = KafkaConsumer(
topic_name,
bootstrap_servers=[
"externalIP:9092",
"externalIP:9092",
"externalIP:9092",
],
auto_offset_reset="latest",
enable_auto_commit=True,
group_id="test-consumer-group",
value_deserializer=lambda x: loads(x.decode("utf-8")),
consumer_timeout_ms=1000,
)
print(consumer.topics())
print("[begin] get consumer list")
for message in consumer:
print("test")
print(
"Topic: %s, Partition: %d, Offset: %d, Key: %s, Value: %s"
% (
message.topic,
message.partition,
message.offset,
message.key,
message.value,
)
)
print("[end] get consumer list")
配置/服务器。在GCP实例上配置的属性文件如下所示:
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-1:9092
我根据搜索中出现的所有情况更改了配置文件,但只有主题在Python代码中打印,并且无法接收数据。
(数据采集与生产者在同一台服务器上实时确认)
我试过断开内部和外部的连接,我试了所有的方法,但是我不能得到它。请帮帮我
你需要通告一个外部可解析的地址。
请参考- https://www.confluent.io/blog/kafka-listeners-explained/
如果你对使用GKE Kubernetes感兴趣,你可以部署Strimzi操作符,它可以用来为你配置外部可解析的地址,以及其他功能。