订阅了响应式管道,但在单元测试Mockito.verify()中失败(没有记录对mock的调用)



我有这个类:

@Slf4j
@RequiredArgsConstructor
@Service
public class SyncTransactionService {
private final SyncProducerService syncProducerService; // kafka producer
private final CouponService couponService; // db persistence service
private final CouponUpdateMessageMapper mapper; // simple mapper to generate message dto for Kafka
public void processChanges(List<Change> changes) {
Flux.fromIterable(changes)
.map(this::processAndSend)
.doOnError(e -> log.error("Cannot sync coupon with change. ", e))
.subscribeOn(Schedulers.elastic())
.subscribe();
}
private Mono<CouponUpdateMessage> processAndSend(Change change) {
return Mono.fromCallable(() -> change)
.doFirst(() -> log.info("saving or deleting the coupon: {}", change.getChanged()))
.map(this::saveOrDelete)
.thenReturn(mapper.map(change))
.doOnSuccess(message -> log.info("sending message: {}", message))
.doOnSuccess(syncProducerService::send);
}
private Mono<Void> saveOrDelete(Change change) {
if (change.getType() == DELETE) return couponService.deleteCoupon(change.getChanged());
else return couponService.saveCoupon(change.getChanged()).then();
}
}

这个测试:

@ExtendWith(MockitoExtension.class)
class SyncTransactionServiceTest {
@Mock
private SyncProducerService syncProducerService;
@Mock
private CouponService couponService;
@Mock
private CouponUpdateMessageMapper mapper;
@InjectMocks
private SyncTransactionService syncTransactionService;

private static Coupon insertId1;
private static Coupon updateId2;
private static Coupon deleteId3;
private static Change change1;
private static Change change2;
private static Change change3;

@BeforeAll
private static void prepareData() {
insertId1 = DataHelper.coupon();
updateId2 = DataHelper.coupon();
updateId2.setId(2);
deleteId3 = DataHelper.coupon();
deleteId3.setId(3);
change1 = Change.builder().changed(insertId1).type(CouponUpdateType.INSERT).build();
change2 = Change.builder().changed(updateId2).type(CouponUpdateType.UPDATE).build();
change3 = Change.builder().changed(deleteId3).type(CouponUpdateType.DELETE).build();
}
@Test
void shouldProcessChanges() {
// given
List<Change> changes = List.of(change1, change2, change3);
when(couponService.saveCoupon(insertId1)).thenReturn(Mono.just(insertId1));
when(couponService.saveCoupon(updateId2)).thenReturn(Mono.just(updateId2));
when(couponService.deleteCoupon(deleteId3)).thenReturn(Mono.empty());
doNothing().when(syncProducerService).send(any());
doCallRealMethod().when(mapper).map(any());
// when
syncTransactionService.processChanges(changes);
// then
verify(couponService, times(2)).saveCoupon(any());
verify(mapper, times(3)).map(any());
verify(couponService).deleteCoupon(any());
verify(syncProducerService, times(3)).send(any());
}
}

当我运行测试时,Mockito.verify()没有检测到与mock的任何交互,尽管我在代码中有subscribe()

那么,我的管道中可能有什么问题呢?

问题是,由于指定了调度程序,被测试的方法异步运行。您应该从测试中的方法返回Flux,然后使用StepVerifier或调用Flux上的collectList()block()方法来触发并等待执行。

正如@Martin Tarjányi所说,如果要测试一种反应式方法,并且它使用Schedulers.elastic(),它将启动异步作业,而您无法立即完成这些作业,因此我看不到任何交互。

如果我坚持下去,我可以:

  • 等待,直到它完成;(使用https://github.com/awaitility/awaitility库或仅使用Thread.sleep(),如:Awaitility.waitAtMost(Duration.ofMillis(2000)).untilAsserted(() -> {verify(...);});(
  • 或者,返回管道并用StepVerifierblock()进行测试记住,对于通量,使用blockLast()获取所有;CCD_ 13仅发射第一个元素

所以现在是这样的:

...
public Flux<CouponUpdateMessage> processChanges(List<Change> changes) {
return Flux.fromIterable(changes)
.flatMap(this::processAndSend)
.doOnError(e -> log.error("Cannot sync coupon with change. ", e))
.subscribeOn(Schedulers.elastic()); // don't subscribe() here, but return it
}
...

测试:

...
// when
syncTransactionService.processChanges(changes).blockLast(); // process all elements
...

我看到了日志,所有的互动都被记录下来了。


如果我没有义务使用Schedulers.elastic(),我可以简单地使用subscribe(),问题中的测试就会起作用。

最新更新