StreamRetryTemplate for Spring Cloud Streams 在集成测试中不重试



我们正在利用Spring Cloud Streams来监听Kafka主题并调用休息服务。我们还实现了一个自定义的 StreamRetryTemplate,以指定我们认为可恢复和不可恢复的错误类型。我无法在运行时的工作方式和集成测试中的工作方式之间获得一致的结果。

我已经在调试模式下验证了异常是否正确抛出,并且 RetryTemplate 正在正确注入,但它似乎没有在我的集成测试中使用。

@EnableBinding(Sink::class)
class MyListener(private val myService: Service) {
  @StreamListener(Sink.Input)
  fun consume(@Payload msg: MyMessage) = myService.process(msg)
  @SteamRetryTemplate
  fun getRetryTemplate() = RetryTemplate()
}

当我运行此应用程序并且myService抛出异常时,我希望重试它,并且它完美地做到了这一点。但是当我使用 wiremock 服务器编写集成测试并让 myService 抛出异常时,它不会重试。我有断言语句来验证我的 wiremock 端点被命中了多少次。

我是否缺少一些专门用于重试在集成测试中工作的内容?

您使用的是测试绑定器还是嵌入式 kafka 代理?测试粘合剂相当有限;使用嵌入式代理是完全集成测试的首选。

请参阅 Spring 中的测试应用程序以获取 Apache Kafka 文档。

编辑

@SpringBootApplication
@EnableBinding(Sink.class)
public class So55855151Application {
    public static void main(String[] args) {
        SpringApplication.run(So55855151Application.class, args);
    }
    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("fail");
    }
    @StreamRetryTemplate
    public RetryTemplate retrier() {
        return new RetryTemplate();
    }
}
spring.cloud.stream.bindings.input.group=input
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
@RunWith(SpringRunner.class)
@SpringBootTest
@EmbeddedKafka
public class So55855151ApplicationTests {
    @Autowired
    private KafkaTemplate<byte[], byte[]> template;
    @Autowired
    private RetryTemplate retrier;
    @Test
    public void test() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        this.retrier.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
                System.out.println("open");
                latch.countDown();
                return true;
            }
            @Override
            public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {
                System.out.println("close");
                latch.countDown();
            }
            @Override
            public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback,
                    Throwable throwable) {
                System.out.println("onError: " + throwable);
                latch.countDown();
            }
        });
        this.template.send("input", "test".getBytes());
        assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
    }
}

最新更新