卡夫卡制作人.消息未被发送



错误

Expiring 2 record(s) for catering-0:120012 ms has passed since batch creation

首先我使用了配置

zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
restart: always

kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
depends_on:
- zookeeper
restart: always
environment:
#      KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test_topic:1:3"

但由于我无法通过localhost和容器同样成功地连接到kafka,我不得不更改这些容器的图像和环境

zookeeper1:
image: confluentinc/cp-zookeeper
container_name: zookeeper
ports:
- "2181:2181"
restart: always
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ZOOKEEPER_SYNC_LIMIT: 2
kafka1:
image: confluentinc/cp-kafka
container_name: kafka
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
depends_on:
- zookeeper1
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_CREATE_TOPICS: "catering:1:3"

在使用者和生产者的配置中,只有端口发生了更改,但现在来自生产者的消息被发送,使用者根本无法到达。过了一段时间,制作人写道,发送消息的时间已经到期,到此结束。也许有人可以帮忙。。。回到最初的配置没有帮助,现在导致了同样的问题生产者配置:

kafka:
producer:
bootstrap-servers: localhost:9094
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

消费者配置:

kafka:
consumer:
bootstrap-servers: localhost:9094
group-id: json
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer

也许这很重要,但在应用程序启动后的第一分钟,我就出现了这个错误。但随后成功连接

[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

更改端口并不能解决批过期问题。卡夫卡制作者不会立即发送数据。您需要显式刷新生产者,降低生产者的批量大小配置,或者发送足够的数据来填充默认的批量大小。

如果生产者批次过期,则不会发送任何记录,因此消费者将看不到任何内容。

要修复管理客户端,请使用顶级spring.kafka.bootstrap-servers配置,而不是单独的生产者和消费者客户端设置。

相关内容

  • 没有找到相关文章

最新更新