Spring启动Kafka集成测试:消费者总是返回0记录



对于下面的测试,我总是得到错误:

org.opentest4j。AssertionFailedError: expected: 10, but was: 0

我到底想在测试范围内验证什么:

我正试图发送10条消息到Kafka,之后我立即试图从Kafka读取这些消息,但由于一些未知的原因KafkaConsumer返回0记录,我正在努力理解为什么消费者不能读取早先发送的消息?

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.IntStream;
@SpringBootTest
@ActiveProfiles("test")
@ContextConfiguration(initializers = TestKafkaContextInitializer.class)
@Slf4j
public class KafkaFlowVerificationITest {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private KafkaProperties kafkaProperties;
private final String kafkaTopic = "test.topic";
@Test
void testKafkaFlow() {
IntStream.range(0, 10)
.forEach(e -> {
try {
kafkaTemplate.send(kafkaTopic, UUID.randomUUID().toString()).get();
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
}
});
checkKafkaForMessage();
}
private void checkKafkaForMessage() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "acme");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(List.of(kafkaTopic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
Assertions.assertThat(records.count()).isEqualTo(10);
}
}

TestKafkaContextInitializer:

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.test.util.TestPropertyValues;
import org.springframework.context.ApplicationContextInitializer;
import org.springframework.context.ConfigurableApplicationContext;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;
@Slf4j
public class TestKafkaContextInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
private final KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.4.3"));
@Override
public void initialize(ConfigurableApplicationContext configurableApplicationContext) {
kafkaContainer.start();
var values = TestPropertyValues.of(
"spring.kafka.producer.bootstrap-servers=" + kafkaContainer.getBootstrapServers(),
"spring.kafka.consumer.bootstrap-servers=" + kafkaContainer.getBootstrapServers()
);
values.applyTo(configurableApplicationContext);
}
}

问题的根本原因是:

我设置TestContainerKafkabootstrap server url为以下属性:

spring.kafka.producer.bootstrap-servers

spring.kafka.consumer.bootstrap-servers

,但在测试中我使用:spring.kafka.bootstrap-servers属性,说明为什么消费者尝试连接localhost:9092的默认URL,而不是TestContainer提供的URL。

最新更新