RxJS运算符将可观察性组成状态



当涉及到异步执行、状态处理和缩进地狱时,我目前面临着一个非常常见的问题。

假设有一个REST调用请求向用户添加用户信息(user_info(,通知他的联系人该更改,并在响应中返回插入或修改的用户信息。但是用户CCD_ 2通过1到n的关系仅知道其CCD_ 3对象的所有id。

呼叫顺序:

request -> saveUserInfo -> updateUser -> notifyContacts -> response

saveToDbupdateUsernotifyContacts都是执行异步副作用的函数,并且不能简单地组合,因为它们都具有不同的输入和输出并且取决于执行顺序。

以下是函数头的松散表示

function saveUserInfo(payload): Promise<UserInfo[]> { /*...*/ }
function updateUser(user_id, user_infos): Promise<User> { /*...*/ }
function sendNotification(user, user_infos): Promise<void> { /*...*/ }

为了编写这个请求处理程序,我目前正在大量使用mergeMap来订阅执行当前异步操作的内部可观察对象。它看起来像这样:

function handleRequest(payload, user_id) {
return saveUserInfo(payload).pipe(
mergeMap(user_infos =>
from(updateUser(user_id, user_infos)).pipe(
mergeMap(user =>
from(notifyContacts(user, user_infos)).pipe(map(() => user_infos))
)
)
)
)
}

我对这个解决方案不满意,因为将来会有更多的逻辑添加到这个解决方案中,我知道这在某个时候可能会变得麻烦。

因此,我搜索了RxJS文档,遗憾的是,没有找到一种聪明的方法来以更令人愉快的方式将这些异步调用管道连接在一起。最大的问题是,我真的无法用几句话完全阐述这个问题,这可能会导致我写下这个问题。

有人知道更好的解决方案吗?我确信一定有什么!

最好没有辅助功能,并且是纯RxJS。

当您使用高阶映射运算符(例如mergeMap及其友元(时,如果提供的回调返回promise,则不必执行() => from(promise)mergeMap会自动为您完成。

考虑到这一点,我会改变这个:

function handleRequest(payload, user_id) {
return saveUserInfo(payload).pipe(
mergeMap(user_infos =>
from(updateUser(user_id, user_infos)).pipe(
mergeMap(user =>
from(notifyContacts(user, user_infos)).pipe(map(() => user_infos))
)
)
)
)
}

进入这个

function handleRequest(payload, user_id) {
return from(saveUserInfo(payload)) // `saveUserInfo(payload)` returns a promise
.pipe(
mergeMap(user_infos => forkJoin(of({ user_infos }), updateUser(user_id, user_infos)),
mergeMap(([acc, user]) => forkJoin(of({ ...acc, otherPropsHere: true }), notifyContacts(user, user_infos))),
map(([acc]) => acc['user_infos'])
),
)
}

forkJoin将在其提供的所有ObservableInput发射至少一个值并完成之后发射阵列

正如您所看到的,返回数组的第一个元素是累加器。

由于我在互联网上找不到任何有用的东西,我想出了一个更高阶的函数,为我提供了一个管道运算符,这可能会解决我的问题,我称之为sourceMap

它应该通过mergeMap订阅一个内部可观察对象,并将其结果合并到一个键控状态对象中。

const sourceMap = <S, T extends Record<string, any>>(
predicate: (acc: T) => Promise<S> | Observable<S>,
key?: keyof T
) => {
return mergeMap((acc: T) =>
from(predicate(acc)).pipe(map(res => (key ? { ...acc, [key]: res } : acc)))
)
}

实现方式如下:

function handleRequest(payload, user_id): Observable<UserInfo[]> {
const state$ = of({ user_infos: [] as UserInfo[], user: {} as User })
return state$.pipe(
sourceMap(() => saveUserInfo(user_id, payload), 'user_info'),
sourceMap(({ user_info }) => updateUser(user_id, user_info), 'user'),
sourceMap(({ user_info, user }) => notifyContacts(user, user_info)),
pluck('user_info')
)
}

仍在寻找更好的解决方案!

您也可以这样做:

function saveUserInfo(payload: Payload): Promise<UserInfo[]>;
function updateUser(user_id: string, user_infos: UserInfo[]): Promise<User>;
function sendNotification(user: User, user_infos: UserInfo[]): Promise<void>;
function handleRequest(payload: Payload, user_id: string): Observable<UserInfo[]> {
return from(saveUserInfo(payload)).pipe(
switchMap(user_infos => updateUser(user_id, user_infos).then(user => ({ user, user_infos }))),
switchMap(({ user, user_infos }) => sendNotification(user, user_infos).then(() => user_infos))
);
}

最新更新