我有这个方法工作良好。通过这种方法,我可以从特定的Kafka主题中抓取消息。
public List<MessageResponseDTO> getMessagesFromTopic(String topicName, Properties properties) {
List<MessageResponseDTO> messages = new ArrayList<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties)) {
consumer.subscribe(Collections.singletonList(topicName));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1000));//returns immediately if there are records available. Otherwise, it will await (loop for timeous ms for polling)
for (ConsumerRecord<String, String> recordMessage : records) {;
String value = recordMessage.value();
String formattedDate = getFormattedDateFromTimeStamp(recordMessage);
MessageResponseDTO buildMessage = MessageResponseDTO.builder().message(value).date(formattedDate).build();
messages.add(buildMessage);
}
}
return messages;
}
当我输入不正确的代理地址时,出现问题。
日志下面
2021-10-09 13:39:55.723 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:39:55.724 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:39:57.865 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:39:57.866 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected
2021-10-09 13:39:59.999 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9097) could not be established. Broker may not be available.
2021-10-09 13:39:59.999 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9097 (id: -3 rack: null) disconnected
2021-10-09 13:40:02.133 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:40:02.133 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:40:04.282 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:40:04.282 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected
2021-10-09 13:40:06.437 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9097) could not be established. Broker may not be available.
2021-10-09 13:40:06.437 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9097 (id: -3 rack: null) disconnected
2021-10-09 13:40:08.570 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:40:08.570 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:40:10.706 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:40:10.706 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected
2021-10-09 13:40:12.843 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9097) could not be established. Broker may not be available.
2021-10-09 13:40:12.843 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9097 (id: -3 rack: null) disconnected
2021-10-09 13:40:14.991 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9095) could not be established. Broker may not be available.
2021-10-09 13:40:14.991 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9095 (id: -1 rack: null) disconnected
2021-10-09 13:40:17.161 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Connection to node -2 (/192.168.0.74:9096) could not be established. Broker may not be available.
2021-10-09 13:40:17.161 WARN 5020 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-1, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9096 (id: -2 rack: null) disconnected
我必须每次等待大约80秒,这时消费者将被关闭,方法返回0条消息。我如何设置超时消费者,这样它不会等待80秒每次但是,例如,减少时间5秒?
我的意思是…
consumer.setTimeoutForBroker(例子)
这也是我的配置文件
private Properties prepareProperties(String brokerAddressByConnectionName) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerAddressByConnectionName);
props.put("group.id", "consumer-test-group-spring-boot");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
另一个问题是,如果我输入一个好的代理地址,但错误的主题,然后消费者将创建一个主题并开始侦听,并在80秒后返回超时。是否可以使用配置缩短此超时?
日志下面
2021-10-09 13:44:53.169 INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.7.1
2021-10-09 13:44:53.169 INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 61dbce85d0d41457
2021-10-09 13:44:53.169 INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1633779893169
2021-10-09 13:44:53.169 INFO 14500 --- [nio-8080-exec-4] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Subscribed to topic(s): TestTopic312
2021-10-09 13:44:55.200 WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Connection to node -1 (/192.168.0.74:9091) could not be established. Broker may not be available.
2021-10-09 13:44:55.201 WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9091 (id: -1 rack: null) disconnected
2021-10-09 13:44:57.342 WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Connection to node -3 (/192.168.0.74:9093) could not be established. Broker may not be available.
2021-10-09 13:44:57.342 WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Bootstrap broker 192.168.0.74:9093 (id: -3 rack: null) disconnected
2021-10-09 13:44:57.475 WARN 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Error while fetching metadata with correlation id 2 : {TestTopic312=LEADER_NOT_AVAILABLE}
2021-10-09 13:44:57.475 INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Cluster ID: pEMDApYNT9a6zyYgPfzEdQ
2021-10-09 13:44:57.484 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Discovered group coordinator 192.168.0.74:9092 (id: 2147482646 rack: null)
2021-10-09 13:44:57.485 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] (Re-)joining group
2021-10-09 13:44:57.493 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] (Re-)joining group
2021-10-09 13:44:57.500 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Successfully joined group with generation Generation{generationId=1, memberId='consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a', protocol='range'}
2021-10-09 13:44:57.593 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Finished assignment for group at generation 1: {consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a=Assignment(partitions=[TestTopic312-0])}
2021-10-09 13:44:57.598 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Successfully synced group in generation Generation{generationId=1, memberId='consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a', protocol='range'}
2021-10-09 13:44:57.598 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Notifying assignor about the new Assignment(partitions=[TestTopic312-0])
2021-10-09 13:44:57.598 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Adding newly assigned partitions: TestTopic312-0
2021-10-09 13:44:57.601 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Found no committed offset for partition TestTopic312-0
2021-10-09 13:44:57.604 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Resetting offset for partition TestTopic312-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[192.168.0.74:9092 (id: 1001 rack: null)], epoch=0}}.
2021-10-09 13:46:33.176 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Revoke previously assigned partitions TestTopic312-0
2021-10-09 13:46:33.176 INFO 14500 --- [nio-8080-exec-4] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-consumer-test-group-spring-boot-3, groupId=consumer-test-group-spring-boot] Member consumer-consumer-test-group-spring-boot-3-a40c2b95-6068-4806-8a65-370455d9001a sending LeaveGroup request to coordinator 192.168.0.74:9092 (id: 2147482646 rack: null) due to the consumer is being closed
2021-10-09 13:46:33.181 INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.common.metrics.Metrics : Metrics scheduler closed
2021-10-09 13:46:33.181 INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.common.metrics.Metrics : Closing reporter org.apache.kafka.common.metrics.JmxReporter
2021-10-09 13:46:33.181 INFO 14500 --- [nio-8080-exec-4] org.apache.kafka.common.metrics.Metrics : Metrics reporters closed
2021-10-09 13:46:33.183 INFO 14500 --- [nio-8080-exec-4] o.a.kafka.common.utils.AppInfoParser : App info kafka.consumer for consumer-consumer-test-group-spring-boot-3 unregistered
我想实现的主要目标是用户不应该等待超过一分钟,这样我就可以手动设置超时在一个坏的代理地址或没有消息的主题,事实上不存在... .
您希望实现能够及时检测错误的代理和主题吗?代理参数是什么?如果你只想找到错误的主题和代理,你可以设置auto.create.topics.enable=false。当主题不存在时,将报告错误。