我有一个简单的生产者类定义如下:
@Configuration
public class MyKafkaProducer {
private final static Logger log = LoggerFactory.getLogger(MyKafkaProducer.class);
@Value("${my.kafka.producer.topic}")
private String topic;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
public void sendDataToKafka(@RequestParam String data) {
ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);
listenableFuture.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("Sent data {}", result.getProducerRecord().value());
}
@Override
public void onFailure(Throwable ex) {
log.error("Unable to send data {} due to: {}", data, ex.getMessage());
}
});
}
}
这是正在进行的测试类:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class MyKafkaProducerTest {
private static final String TOPIC = "device";
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
private MyKafkaProducer producer;
BlockingQueue<ConsumerRecord<String, String>> records;
KafkaMessageListenerContainer<String, String> container;
@BeforeAll
void setUp() {
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProperties = new ContainerProperties(TOPIC);
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, String>) records::add);
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
@Test
public void testIfWorks() throws InterruptedException {
// Arrange
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
// Act
producer.send(new ProducerRecord<>(TOPIC, "my-aggregate-id", "{"event":"Test Event"}"));
producer.flush();
// Assert
ConsumerRecord<String, String> singleRecord = records.poll(100, TimeUnit.MILLISECONDS);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo("my-aggregate-id");
assertThat(singleRecord.value()).isEqualTo("{"event":"Test Event"}");
}
问题是测试创建了一个默认的生产者:
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
如何使用自己的生产者MyKafkaProducer
并调用其sendDataToKafka
方法?在这种情况下,我们可以如何测试以及测试什么?
源代码可以在这里找到。具有正在进行的测试的分支在这里。非常感谢。
因此它是一个Spring Boot应用程序,并且您正在使用自动配置的KafkaTemplate
。
要覆盖bootstrap-servers
以使用嵌入式kafka代理,请参阅https://docs.spring.io/spring-kafka/docs/2.5.5.RELEASE/reference/html/#kafka-测试嵌入式kafka注释
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
然后,您可以从测试用例中调用您的生产者。