我对docker和Kafka非常陌生,并且有一个简单的Kafka python发布器,如下所示
以下是我的dockerfile:
FROM python:3.10
WORKDIR /app
COPY . /app
RUN pip install --user pip==23.0.1 && pip install pipenv && pipenv install --system
ENV ENVIRONMENT=production
CMD ["python3", "src/consumer.py"]
以及我的yaml文件撰写:
version: '3'
services:
zookeeper:
image: confluent/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: confluent/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
publisher:
container_name: publisher
build:
context: ./publisher
dockerfile: Dockerfile
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_TOPIC: metrics
KAFKA_BROKER: kafka:9092
depends_on:
- kafka
consumer:
container_name: consumer
build:
context: ./consumer
dockerfile: Dockerfile
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_TOPIC: test_topic
KAFKA_BROKER: kafka:9092
depends_on:
- kafka
在consumer.py
我有:.
print('-- consumer script -- ')
import json
from kafka import KafkaProducer, KafkaConsumer
test_topic = "test_topic"
consumer = KafkaConsumer(auto_offset_reset='earliest', bootstrap_servers=['kafka:9092'],
api_version=(0, 10), consumer_timeout_ms=10000)
consumer.subscribe([test_topic])
for message in consumer: # the line that seems to influence docker attachement
print('-----loop works: ', message)
在一个终端中,我首先运行:
docker-compose -f docker-compose.yml up zookeeper
和Kafka类似:
docker-compose -f docker-compose.yml up kafka
然后:
docker-compose -f docker-compose.yml up consumer
但是终端似乎被以下几行卡住了:
Starting consumer ... done
Attaching to consumer
当我删除for message in consumer
行时,下面的代码没有问题
在Compose模式下,depends_on: kafka
不等待容器启动
因此,你的消费者可能在等待超时,而kafka容器至少需要几秒钟才能启动。因此,将sleep(10)
(例如)添加到消费者
其次,Kafka容器没有自动创建主题,因此,你的消费者正在轮询没有数据消费和"看起来卡住了",当它实际上只是空转时,你可以发现,如果你提供group_id
配置给消费者并为该组运行kafka-consumer-groups --describe
注:Kafka默认只监听本地(至少裸机安装),所以我总是建议你更新绑定地址
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092