响应式管道中的后台任务(即发即弃)



我有一个响应式管道来处理传入的请求。对于每个请求,我需要调用一个业务相关的函数(doSomeRelevantProcessing)。

完成之后,我需要通知一些外部服务发生了什么。这部分管道不应该增加整体响应时间.此外,通知这个外部系统不是业务关键:在管道的主要部分完成后给出快速响应比确保通知成功更重要。

据我所知,在后台运行某些东西而不减慢整个进程的唯一方法是直接在管道中订阅in,从而实现"即发即弃"的心态。

flatmap内部订阅是否有一个好的替代方案?我有点担心,如果通知外部服务的时间比原来的处理时间长,并且大量请求同时进入,可能会发生什么。这会导致内存耗尽或整个进程阻塞吗?

fun runPipeline(incoming: Mono<Request>) = incoming
.flatMap { doSomeRelevantProcessing(it) } // this should not be delayed
.flatMap { doBackgroundJob(it) } // this can take a moment, but is not super critical
fun doSomeRelevantProcessing(request: Request) = Mono.just(request) // do some processing
fun doBackgroundJob(request: Request) = Mono.deferContextual { ctx: ContextView ->
val notification = "notification" // build an object from context
// this uses non-blocking HTTP (i.e. webclient), so it can take a second or so 
notifyExternalService(notification).subscribeOn(Schedulers.boundedElastic()).subscribe()
Mono.just(Unit)
}
fun notifyExternalService(notification: String) = Mono.just(Unit) // might take a while

我回答这个问题的前提是,您使用纯响应式机制通知外部服务——也就是说,您没有封装阻塞服务。如果你是,那么答案将是不同的,因为你受到有限弹性线程池大小的限制,如果每秒有数百个请求传入,它可能很快就会变得不堪重负。

(假设您正在使用响应式机制,那么就不需要您在示例中给出的.subscribeOn(Schedulers.boundedElastic()),因为它不会为您购买任何东西-它是为包装遗留阻塞服务而设计的。)

这会导致内存耗尽吗

只有在非常极端的情况下才有可能,每个单独请求使用的内存将很小。这几乎肯定不值得担心,如果你开始在这里看到内存问题,那么你几乎肯定会受到其他地方的其他问题的打击。

话虽如此,我可能会建议在内部订阅方法之前添加.timeout(Duration.ofSeconds(5))或类似的方法,以确保请求在一段时间后被杀死,如果它们由于任何原因无法工作-这将防止它们积聚。

…或者(这会导致)整个过程受阻吗?

这个更简单——一个简短的no,它不能。

最新更新