Spring启动REST服务Kafka主题commitSync失败



我有一个简单的Spring启动服务,它按需调用,并使用来自主题的指定数量的消息。要使用的消息数作为参数传递。每30分钟呼叫一次服务。每条消息的大小约为1.6 kb。我每次都会收到大约1100或1200条信息。只有一个主题和一个分区,REST服务是唯一的使用者。以下是服务的调用方式http://example.com/messages?limit=2000

private OutputResponse getNewMessages(String limit) throws Exception {

System.out.println("***** START *****");

final long start = System.nanoTime();

int loopCntr = 0;   
int counter = 0;
OutputResponse outputResponse = new OutputResponse();       
Output output = new Output();
List<Output> rspListObject = new ArrayList<>();
Consumer<Object, Object> consumer = null;
String result = null;
try {
Properties p = new Properties();
p.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "180000");
p.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, limit);

consumer = consumerFactory.createConsumer("my-group-id", null, null, p);            
consumer.assign(Collections.singleton(new TopicPartition("test-topic", 0)));
while (loopCntr < 2) {
loopCntr++;
ConsumerRecords<Object, Object> consumerRecords = consumer.poll(Duration.ofSeconds(15));

for (ConsumerRecord<Object, Object> record : consumerRecords)
{
counter++; 
try
{
//get json string
result = mapper.writeValueAsString(record.value());
//to json
output = mapper.readValue(result, Output.class);                   
rspListObject.add(output);                       
} catch (Exception e) {
logger.error(e);
insertToDB(record.value(),record.offset());
}
}
}
outputResponse.setObjects(rspListObject);

final long end = System.nanoTime();
System.out.println("Took: " + ((end - start) / 1000000) + "ms");
System.out.println("Took: " + (end - start) / 1000000000 + " seconds");
// commit the offset of records to broker
if (counter > 0) {
consumer.commitSync();
}
} finally {
try {
System.out.println(" >>>>> closing the  consumer");
if (consumer != null)
consumer.close();
}catch(Exception e){
//log message
}
}
return outputResponse;
}

这就是我的应用程序。yml

spring:
kafka:
consumer:
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.trusted.packages: '*'
max.poll.interval.ms: 300000
group-id: my-group-id

ConsumerConfig值:

allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
check.crcs = true
client.dns.lookup = default
client.id = 
client.rack = 
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my-group-id
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 180000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100

这是我在commitSync中遇到的错误。尝试在执行poll((时消耗5条消息,尝试设置p.put(ConsumerConfig.MAX_poll_INTERVAL_MS_CONFIG,"180000"(;但错误相同

由于组已重新平衡并且已将分区分配给另一个成员。这意味着对poll((的后续调用之间的间隔比配置的max.poll.interval.ms,这通常意味着轮询循环花费太多时间处理消息。你可以解决这个问题通过增加max.poll.interval.ms或通过减小批处理在poll((中返回,最大值为.poll.record.

我相信这个应用程序模拟了您的用例,但它没有表现出您所描述的行为(正如我所期望的(。手动分配主题/分区时,您永远不应该看到重新平衡。

我建议您运行两者并比较DEBUG日志以找出问题所在。

@SpringBootApplication
public class So63713473Application {
public static void main(String[] args) {
SpringApplication.run(So63713473Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63713473").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(ConsumerFactory<String, String> factory, KafkaTemplate<String, String> template) {
String msg = new String(new byte[1600]);
return args -> {
while (true) {
System.out.println("Hit enter to run a consumer");
System.in.read();
int count = 0;
try (Consumer<String, String> consumer = factory.createConsumer("so63713473", "")) {
IntStream.range(0, 1200).forEach(i -> template.send("so63713473", msg));
consumer.assign(Collections.singletonList(new TopicPartition("so63713473", 0)));
while (count < 1200) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
count += records.count();
System.out.println("Count=" + count);
}
consumer.commitSync();
System.out.println("Success");
}
}
};
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.fetch-min-size=1920000
spring.kafka.consumer.fetch-max-wait=1000
spring.kafka.producer.properties.linger.ms=50

编辑

我可以通过在同一组中添加第二个(自动分配的(消费者来复制您的问题。

@KafkaListener(id = "so63713473", topics = "so63713473")
public void listen(String in) {
System.out.println(in);
}
2020-09-08 16:40:15.828 ERROR 88813 --- [           main] o.s.boot.SpringApplication               : Application run failed
java.lang.IllegalStateException: Failed to execute ApplicationRunner
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:789) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:776) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.3.RELEASE.jar:2.3.3.RELEASE]
at com.example.demo.So63713473Application.main(So63713473Application.java:25) [classes/:na]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

不能在同一组中混合手动分配和自动分配。

最新更新