如何基于另一个可观察量重置 RXJS 扫描运算符



我有一个组件,当呈现虚拟列表中的最后一项时,它会触发onScrollEnd事件。此事件将执行新的 API 请求以提取下一页,并使用scan运算符将它们与之前的结果合并。

此组件还具有触发onSearch事件的搜索字段。

触发搜索事件时,如何清除scan运算符的先前累积结果?还是我需要在这里重构我的逻辑?

const loading$ = new BehaviorSubject(false);
const offset$ = new BehaviorSubject(0);
const search$ = new BehaviorSubject(null);
const options$: Observable<any[]> = merge(offset$, search$).pipe(
// 1. Start the loading indicator.
tap(() => loading$.next(true)),
// 2. Fetch new items based on the offset.
switchMap(([offset, searchterm]) => userService.getUsers(offset, searchterm)),
// 3. Stop the loading indicator.
tap(() => loading$.next(false)),
// 4. Complete the Observable when there is no 'next' link.
takeWhile((response) => response.links.next),
// 5. Map the response.
map(({ data }) =>
data.map((user) => ({
label: user.name,
value: user.id
}))
),
// 6. Accumulate the new options with the previous options.
scan((acc, curr) => {
// TODO: Dont merge on search$.next 
return [...acc, ...curr]);
}
);
// Fetch next page
onScrollEnd: (offset: number) => offset$.next(offset);
// Fetch search results
onSearch: (term) => {
search$.next(term);
};

要操作scanstate,您可以编写高阶函数来获取旧状态和新更新。然后与合并运算符合并。通过这种方式,您可以坚持使用干净的面向流的解决方案,而不会产生任何副作用。

const { Subject, merge } = rxjs;
const { scan, map } = rxjs.operators;
add$ = new Subject();
clear$ = new Subject();
add = (value) => (state) => [...state, value];
clear = () => (state) => [];
const result$ = merge(
add$.pipe(map(add)),
clear$.pipe(map(clear))
).pipe(
scan((state, innerFn) => innerFn(state), [])
)
result$.subscribe(result => console.log(...result))
add$.next(1)
add$.next(2)
clear$.next()
add$.next(3)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

此方法可以轻松扩展和/或适应 rxjs 中的其他state用例。

示例(删除最后一项)

removeLast$ = new Subject()
removeLast = () => (state) => state.slice(0, -1);
merge(
..
removeLast$.pipe(map(removeLast)),
..
)

我认为您可以通过重组链来实现您想要的(为了简单起见,我省略了触发加载tap调用):

search$.pipe(
switchMap(searchterm =>
concat(
userService.getUsers(0, searchterm),
offset$.pipe(concatMap(offset => userService.getUsers(offset, searchterm)))),
).pipe(
map(({ data }) => data.map((user) => ({
label: user.name,
value: user.id
}))),
scan((acc, curr) => [...acc, ...curr], []),
),
),
);

来自search$的每个发射都会创建一个新的内部可观察对象,该scan将从空累加器开始。

找到了一个可行的解决方案:我在scan运算符之前使用withLatestFrom检查电流偏移量,并在需要时根据该值重置累加器。

堆栈闪电战演示

这是一个有趣的流。考虑到这一点,offset$ 和 search$ 实际上是 2 个独立的流,但是,具有不同的逻辑,因此应该在最后而不是开头合并。

此外,在我看来,搜索应该将偏移量重置为 0,而我在当前逻辑中看不到这一点。

所以我的想法是:

const offsettedOptions$ = offset$.pipe(
tap(() => loading$.next(true)),    
withLatestFrom(search$),
concatMap(([offset, searchterm]) => userService.getUsers(offset, searchterm)),
tap(() => loading$.next(false)),
map(({ data }) =>
data.map((user) => ({
label: user.name,
value: user.id
})),
scan((acc, curr) => [...acc, ...curr])
);
const searchedOptions$ = search$.pipe(
tap(() => loading$.next(true)),
concatMap(searchTerm => userService.getUsers(0, searchterm)),
tap(() => loading$.next(false)),
map(({ data }) =>
data.map((user) => ({
label: user.name,
value: user.id
})),
);
const options$ = merge(offsettedOptions, searchedOptions);

看看这是否有效或有意义。我可能缺少一些上下文。

我知道它很旧,但我只需要做同样的事情,并有另一种解决方案可以加入其中。

实际上,用户可以触发 2 个操作

const search$ = new Subject<string>();
const offset$ = new Subject<number>();

search$发出之前,我们并不真正关心offset$,此时,我们希望它为 0 以重新开始。我会这样写:

const results$ = search$.pipe( // Search emits
switchMap((searchTerm) => {
return offset$.pipe( // Start watching offset
startWith(0),  // We want a value right away, so set it to 0
switchMap((offset) => {
return userService.getUsers(offset, searchTerm)) // get the stuff
})
)
}))

此时,我们每次发出search$时都会重置偏移量,每当发出offset$时,我们都会进行新的 api 调用以获取所需的资源。如果search$发出,我们需要重置集合,所以我相信正确的位置是在包裹offset$switchMap里面。

const results$ = search$.pipe( // Search emits
switchMap((searchTerm) => {
return offset$.pipe( // Start watching offset
startWith(0),  // We want a value right away, so set it to 0
switchMap((offset) => {
return userService.getUsers(offset, searchTerm)) // get the stuff
}),
takeWhile((response) => response.links.next), // stop when we know there are no more.
// Turn the data in to a useful shape
map(({ data }) => 
data.map((user) => ({
label: user.name,
value: user.id
}))
),
// Append the new data with the existing list
scan((list, response) => {
return [  // merge
...list,
...response
]
}, [])
)
}))

这里最重要的部分是扫描在每次新的search$发射时重置。

最后一点,我会loading$移出水龙头,并单独声明。最终代码应如下所示

const search$ = new Subject<string>();
const offset$ = new Subject<number>();
let results$: Observable<{label: string; value: string}[]>;
results$ = search$.pipe( // Search emits
switchMap((searchTerm) => {
return offset$.pipe( // Start watching offset
startWith(0),  // We want a value right away, so set it to 0
switchMap((offset) => {
return userService.getUsers(offset, searchTerm)) // get the stuff
}),
takeWhile((response) => response.links.next), // stop when we know there are no more.
// Turn the data in to a useful shape
map(({ data }) => 
data.map((user) => ({
label: user.name,
value: user.id
}))
),
// Append the new data with the existing list
scan((list, response) => {
return [  // merge
...list,
...response
]
}, [])
)
}));
const loading$ = merge(
search$.pipe(mapTo(true)), // set to true whenever search emits
offset$.pipe(mapTo(true)),  // set to true when offset emits
results$.pipe(mapTo(false)), // set to false when we get new results
);
results$.subscribe((results) => {
console.log(results);
})

最新更新