使用EmbddedKafka测试Kafka消费者在启动一批测试时失败



我正在Spring Boot中测试我的Kafka Consumer。我的消费者与以下相似

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaPaymentConsumer {
private final PaymentInterface paymentInterface;
@KafkaListener(topics = "#{'${kafka.topic.payment}'}",
groupId = "#{'${kafka.group-id}'}")
public void consumePaymentEvents(PaymentEvent paymentEvent) {
paymentInterface.handleReceiptPaymentReceivedEvent(paymentEvent);        
}
}

我的测试用例s类似于下面的

@SpringBootTest
@EmbeddedKafka(brokerProperties = {"listeners=PLAINTEXT://localhost:9092"},
partitions = 1,
controlledShutdown = true)
class KafkaPaymentConsumerTest {
@Autowired
KafkaTemplate<String, PaymentEvent> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
@Value("${kafka.topic.payment}")
private String paymentTopic;

@SpyBean
private KafkaPaymentConsumer kafkaPaymentConsumer;
@SpyBean
private PaymentInterface paymentInterface;
@Captor
ArgumentCaptor<PaymentEvent> paymentEventCaptor;
private static File PAYMENT_EVENT_JSON = Paths.get("src", "test", "resources", "files",
"Payment.json").toFile();
@Test
@SneakyThrows
@DirtiesContext
void consumePaymentEvents() {
PaymentEvent event = objectMapper.readValue(PAYMENT_EVENT_JSON,
PaymentEvent.class);
kafkaTemplate.send(paymentTopic, "1", event);
verify(kafkaPaymentConsumer, timeout(10000).times(1)).consumePaymentEvents(
paymentEventCaptor.capture());
PaymentEvent argument = paymentEventCaptor.getValue();
verify(paymentInterface, timeout(10000).times(1)).handleReceiptPaymentReceivedEvent(any());
}
}

测试运行良好,但当同时运行一批测试时,某些测试失败!(只有当我同时运行许多测试时!!(@EmbeddedKafka 的上下文中似乎存在问题

我得到像这些日志错误

实际上,与这个mock没有任何交互。

尝试从代理轮询记录时超时

任何解释或建议请

由于您最终不会在测试类上使用@DirtiesContext来关闭应用程序上下文,因此同一主题的其他测试可能会从您那里窃取数据也就不足为奇了。看看你是否能像我解释的那样清理上下文,或者考虑在不同的测试中使用不同的主题。我更喜欢脏的上下文,因为它保证了内存中没有任何额外的资源会导致比赛条件和意外。

最新更新