TLDR:我在RxJava Observables中进行后台处理,我正在进行集成测试,我希望能够独立等待处理完成,以确保从一个测试开始的后台处理不会干扰另一个测试。
简化后,我有一个@RequestMapping
方法,它可以执行以下操作:
- 在数据库中插入数据
- 启动该数据的异步处理(通过Feign进行http调用,数据库更新)
- 不返回任何内容(
HttpStatus.NO_CONTENT
)
这种异步处理以前是用ThreadPoolTaskExecutor
完成的。我们将过渡到RxJava,并希望删除此ThreadPoolTaskExecutor,并使用RxJava进行后台处理。
所以当时我很天真地尝试这样做:
Observable
.defer(() -> Observable.just(call to long blocking method)
.subscribeOn(Schedulers.io())
.subscribe();
最终目标当然是,一步一个脚印,深入到"调用长阻塞方法",并一直使用Observable。
在此之前,我想先让我的集成测试工作起来。我通过对映射进行RestTemplate调用来测试这一点。由于大部分工作都是异步的,所以我的调用返回得非常快。现在我想找到一种方法来等待异步处理完成(以确保它不会与另一个测试冲突)。
在RxJava之前,我只会计算ThreadPoolTaskExecutor中的任务,并等待它达到0。
如何使用RxJava做到这一点?
我尝试过的:
- 我试图用RxJavaSchedulersHook使我的所有Scheduler立即生效:这会在某个地方造成某种阻塞,代码执行在我的Feign调用之前停止(Feign在后台使用RxJava)
- 我试着用Rx-RxJavaObservableExecutionHook计算任务数:我试着保留订阅,并在isSubcribed=false时删除它们,但这根本不起作用(很多订阅用户,数量永远不会下降)
- 我试图在实际的生产代码中放入observeOn(immediate())。这似乎是可行的,我可以为运行时/测试阶段注入正确的调度程序,但我并不真正热衷于将仅用于测试目的的代码放在我的实际生产代码中
我可能大错特错,或者事情过于复杂,所以不要犹豫,纠正我的理由!
如何返回HttpStatus.NO_CONTENT
?
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe();
return HttpStatus.NO_CONTENT;
}
在这种形式下,您无法知道longMethod
何时完成。
如果您想知道所有异步作业何时完成,可以使用Spring DefferedResult
或TestSubscriber
在所有作业完成时返回HttpStatus.NO_CONTENT
PS:如果您想要,可以使用Observable.fromCallable(() -> longMethod());
而不是Observable.defer(() -> Observable.just(longMethod());
使用DefferedResult
@RequestMapping(value = "/")
public DeferredResult<HttpStatus> index() {
DeferredResult<HttpStatus> deferredResult = new DeferredResult<HttpStatus>();
Observable.fromCallable(() -> longMethod())
.subscribeOn(Schedulers.io())
.subscribe(value -> {}, e -> deferredResult.setErrorResult(e.getMessage()), () -> deferredResult.setResult(HttpStatus.NO_CONTENT))
return deferredResult;
}
像这样,如果你调用你的方法,你只有在你的可观察完成时(所以,当longMethod
完成时)才能得到结果
使用TestSubscriber
你必须注入一个TestSubscriber
,当要求他等待/检查你的Observable:的完成时
@RequestMapping(value = "/")
public HttpStatus home() {
Observable.defer(() -> Observable.just(longMethod())
.subscribeOn(Schedulers.io())
.subscribe(subscriber); // you'll have to inject this subscriber in your test
return HttpStatus.NO_CONTENT;
}
在你的测试中:
TestSubscriber subscriber = new TestSubscriber(); // you'll have to inject it into your controller
// ....
controller.home();
subscriber.awaitTerminalEvent();
subscriber.assertCompleted(); // check that no error occurred
您可以使用ExecutorServiceAdapter
在RX中从Spring ThreadPoolTaskExecutor
桥接到ExecutorService
,然后执行与以前相同的技巧。
几个月后的游戏:我的建议只是"不要那样做"。RxJava
并不适合这种工作。如果不详细说明,在后台运行大量"松散"的Observable是不合适的:根据请求的数量,你很容易陷入队列和内存问题,更重要的是,如果Web服务器崩溃,所有计划和运行的任务会发生什么?如何重新启动?
Spring提供了其他更好的替代方案imho:Spring Batch、Spring Cloud Task、使用Spring Cloud Stream进行消息传递,所以不要像我那样做,只需为正确的工作使用正确的工具。
现在,如果你真的想走糟糕的路线:
- 返回一个SseEmmitter,只使用使用者服务中SSE的第一个事件,并使用测试中的所有事件
- 创建一个RxJava
lift
运算符,将订阅服务器封装在具有waitForCompletion
方法的父订阅服务器中(在call
方法中)。如何进行等待取决于您(例如CountDownLatch
)。该订阅者将被添加到一个同步列表中(完成后从中删除),在测试中,您只需迭代该列表,并对列表中的每个项目调用waitForCompletion
。它没有那么复杂,我已经开始工作了,但请不要那样做