使用observable值发出返回另一个observable的请求并推送给Kafka



我在Service中使用NestJS和内置的HttpModule,最终目标是将Observable<AxiosResponse<any>>返回的值推送给Kafka主题。

我的Observables是来自HttpService(HttpModule的提供者(的响应,它调用JSON-RPC请求(HTTPPOST请求(,例如如下所示:

getInfo(): Observable<AxiosResponse<any>> {
return this.httpService.post(this.url, { "method" : "somemethod" } )
}

我知道存储Observable的值被认为不是一种好的做法,因为它不符合函数式编程范式。

但是,如果我需要存储来自AxiosResponse<any>的值,并将其输入到另一个httpService.post请求中,该怎么办?我该怎么做?

我已经尝试在我的Service中使用一个单独的函数,它返回一个Observable,我想在这个函数中使用getInfo()并订阅结果,所以我可以通过这种方式将一个值从第一个Observable传递到第二个Observable

getMoreInfo(): Observable<AxiosResponse<any>> {
this.getInfo().pipe(
take(1))
.subscribe(
(response) => {
return this.httpService.post(this.url, {
"method" : "othermethod",
"params": {
"myparam" : response.data.result.param
}
})
}
)
}

但是,因为return在第一个Observable的范围内,所以我的函数无效,因为它没有有效的return类型Observable<AxiosResponse<any>>

我想检查嵌套逻辑是否正确,事实上是这样,因为我可以这样做:

getMoreInfo() {
this.getInfo().pipe(
take(1))
.subscribe(
(response) => {
this.httpService.post(this.url, {
"method" : "othermethod",
"params": {
"stringparam" : response.data.result.param.toString()
}
}).subscribe((secondResponse) => { console.log(secondResponse) }                         
}
)
}

因为我想要我的NestJsControllersubscribeObservables,并且只让Service创建Observables,所以我必须returnObservable<AxiosResponse<any>>。我该怎么做?

实际上,我只想获取Observable的输出,并将其传递到另一个函数中。如果我知道如何做到这一点,我可以将其传递给负责相应地推送到Kafka的函数,并且我将能够将该值传递给getMoreInfo()函数。

感谢@Drenai为我指明了正确的方向!

我能够使用mergeMap运算符来解决它。

解决方案如下:

getMoreInfo() {
this.getInfo().pipe(
mergeMap((res) => this.httpService.post(this.url, {
"method" : "somemethod",
"params" : {
"stringparam" : res.data.result.myparam.toString()
}
})
))
}

最新更新