如果代理地址不正确、主题中没有消息或缺少主题,如何为 KafkaConsumer 设置超时?



我有这个方法工作良好。通过这种方法,我可以从特定的Kafka主题中抓取消息。

public List<MessageResponseDTO> getMessagesFromTopic(String topicName, Properties properties) {
List<MessageResponseDTO> messages = new ArrayList<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1000));//returns immediately if there are records available. Otherwise, it will await (loop for timeous ms for polling)
for (ConsumerRecord<String, String> recordMessage : records) {;
String value = recordMessage.value();
String formattedDate = getFormattedDateFromTimeStamp(recordMessage);
MessageResponseDTO buildMessage = MessageResponseDTO.builder().message(value).date(formattedDate).build();
messages.add(buildMessage);
}
}
return messages;
}

当我输入不正确的代理地址时,出现问题。

日志下面

2021-10-09 13:39:55.723  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:39:55.724  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:39:57.865  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:39:57.866  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected
2021-10-09 13:39:59.999  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9097) could not be established. Broker may not be available.
2021-10-09 13:39:59.999  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9097 (id: -3 rack: null) disconnected
2021-10-09 13:40:02.133  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:40:02.133  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:40:04.282  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:40:04.282  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected
2021-10-09 13:40:06.437  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9097) could not be established. Broker may not be available.
2021-10-09 13:40:06.437  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9097 (id: -3 rack: null) disconnected
2021-10-09 13:40:08.570  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:40:08.570  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:40:10.706  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:40:10.706  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected
2021-10-09 13:40:12.843  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9097) could not be established. Broker may not be available.
2021-10-09 13:40:12.843  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9097 (id: -3 rack: null) disconnected
2021-10-09 13:40:14.991  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:40:14.991  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:40:17.161  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:40:17.161  WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected

我必须每次等待大约80秒,这时消费者将被关闭,方法返回0条消息。我如何设置超时消费者,这样它不会等待80秒每次但是,例如,减少时间5秒?

我的意思是…

consumer.setTimeoutForBroker(例子)

这也是我的配置文件

private Properties prepareProperties(String brokerAddressByConnectionName) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddressByConnectionName);
props.put("group.id", "consumer-test-group-spring-boot");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}

另一个问题是,如果我输入一个好的代理地址,但错误的主题,然后消费者将创建一个主题并开始侦听,并在80秒后返回超时。是否可以使用配置缩短此超时?

日志下面

2021-10-09 13:44:53.169  INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.7.1
2021-10-09 13:44:53.169  INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 61dbce85d0d41457
2021-10-09 13:44:53.169  INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1633779893169
2021-10-09 13:44:53.169  INFO 14500 --- [nio-8080-exec-4] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Subscribed to topic(s): TestTopic312
2021-10-09 13:44:55.200  WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9091) could not be established. Broker may not be available.
2021-10-09 13:44:55.201  WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9091 (id: -1 rack: null) disconnected
2021-10-09 13:44:57.342  WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9093) could not be established. Broker may not be available.
2021-10-09 13:44:57.342  WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9093 (id: -3 rack: null) disconnected
2021-10-09 13:44:57.475  WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Error while fetching metadata with correlation id 2 : {TestTopic312=LEADER_NOT_AVAILABLE}
2021-10-09 13:44:57.475  INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Cluster ID: pEMDApYNT9a6zyYgPfzEdQ
2021-10-09 13:44:57.484  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Discovered group coordinator 192.168.0.74:9092 (id: 2147482646 rack: null)
2021-10-09 13:44:57.485  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] (Re-)joining group
2021-10-09 13:44:57.493  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] (Re-)joining group
2021-10-09 13:44:57.500  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Successfully joined group with generation Generation{generationId=1, memberId='consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a', protocol='range'}
2021-10-09 13:44:57.593  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Finished assignment for group at generation 1: {consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a=Assignment(partitions=[TestTopic312-0])}
2021-10-09 13:44:57.598  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Successfully synced group in generation Generation{generationId=1, memberId='consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a', protocol='range'}
2021-10-09 13:44:57.598  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Notifying assignor about the new Assignment(partitions=[TestTopic312-0])
2021-10-09 13:44:57.598  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Adding newly assigned partitions: TestTopic312-0
2021-10-09 13:44:57.601  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Found no committed offset for partition TestTopic312-0
2021-10-09 13:44:57.604  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Resetting offset for partition TestTopic312-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.0.74:9092 (id: 1001 rack: null)], epoch=0}}.
2021-10-09 13:46:33.176  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Revoke previously assigned partitions TestTopic312-0
2021-10-09 13:46:33.176  INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Member consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a sending LeaveGroup request to coordinator 192.168.0.74:9092 (id: 2147482646 rack: null) due to the consumer is being closed
2021-10-09 13:46:33.181  INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.common.metrics.Metrics  : Metrics scheduler closed
2021-10-09 13:46:33.181  INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.common.metrics.Metrics  : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-10-09 13:46:33.181  INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.common.metrics.Metrics  : Metrics reporters closed
2021-10-09 13:46:33.183  INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser     : App info kafka.consumer for consumer-consumer-test-group-spring-boot-3 unregistered

我想实现的主要目标是用户不应该等待超过一分钟,这样我就可以手动设置超时在一个坏的代理地址或没有消息的主题,事实上不存在... .

您希望实现能够及时检测错误的代理和主题吗?代理参数是什么?如果你只想找到错误的主题和代理,你可以设置auto.create.topics.enable=false。当主题不存在时,将报告错误。

相关内容

  • 没有找到相关文章

最新更新