RxJS:合并两个可观察对象,这样可观察对象只有在第一个可观察对象的值为true时才会发出



我正在尝试使用可观察对象实现一个简单的离线/在线同步机制。

基本上,我有两个可观察对象:

  1. 连接可观察到的:第一个可观察到的给我的信息是否有一个互联网连接。当网络状态改变时发出
  2. Data observable:第二个observable中有需要同步的数据。当有新数据要同步时发出

我想实现的是将上述可观察对象组合起来,以便:

  • 只要连接状态为false,组合的可观察对象就不应该发出。在这种情况下,数据可观察对象应该保持它的状态
  • 只要连接状态为true,每当数据可观察对象
  • 中有数据时,合并的可观察对象就会发出。
  • 如果连接状态从false切换到true,则对可观测数据上的每个值发出

当前使用filter和combinellatest的一个小示例可以在这里找到:https://codesandbox.io/s/offline-sync-s5lv49?file=/src/index.js

不幸的是,这根本没有按照预期的方式运行。

是否有任何操作符来实现所需的行为?作为一种替代方法,我可以轮询连接状态,每隔X秒发出一次。但理想情况下,我想要一个清晰的可观察对象组合,尽管我有点不知道哪个操作符最有意义。

澄清一下:我需要同步所有的数据,而不仅仅是最新的。所以数据可观察对象应该缓冲数据。

看起来我们在正确的道路上,可以做下面的事情:

combineLatest([
offlineOnlineSubject,
dataSubject
])
.pipe(
filter(([online, counter]) => online),
)
.subscribe(([online, counter]) => {
syncedIndicator.textContent = counter;
});

所以当我们处于offline模式时,我将所有发出的数据存储在缓冲区中,一旦我们从offline更改为online,我只是在缓冲区中发出所有内容:

let dataBuffer = [];
let isPreviouslyOnline = false;
combineLatest([dataSubject, offlineOnlineSubject])
.pipe(
filter(([data, isOnline]) => {
if (!isOnline) {
if (!isPreviouslyOnline) {
dataBuffer.push(data);
}
isPreviouslyOnline = false;
return false;
}
return true;
}),
switchMap(([data]) => {
isPreviouslyOnline = true;
if (dataBuffer.length > 0) {
const tempData = [...dataBuffer];
dataBuffer = [];
return from(tempData);
} else {
return of(data);
}
})
)
.subscribe((data) => {
console.log("Data is: ", data);
});

代码沙箱:https://codesandbox.io/s/offline-sync-forked-k38kc3?file=/src/index.js

我认为它可以工作,但感觉不太好,我试图使用本地rxjs缓冲区操作符来实现它,但无法弄清楚如何。希望看到如果有谁有更好的/更简洁的解决方案。

后还在寻找这个词"gate",我发现以下堆栈溢出问题,文章:rxjs的条件发射延迟

基本上,答案是使用delayWhen来达到期望的结果。

我在这里更新了一个例子:https://codesandbox.io/s/offline-sync-experiments-nimoox?file=/src/index.js:0-1357

关键部分是:

const offlineOnlineSubject = new BehaviorSubject(false);
const dataSubject = new Subject();
const triggerFn = (_) => offlineOnlineSubject.pipe(filter((v) => v));
dataSubject.pipe(delayWhen(triggerFn)).subscribe((counter) => {
console.log("Syncing data", {
counter
});
syncedIndicator.innerHTML += `<li>${counter}</li>`;
});

用自定义typescript操作符包装:

import { MonoTypeOperatorFunction, Observable } from 'rxjs';
import { delayWhen, filter } from 'rxjs/operators';
export function gate<T>(gateTrigger: Observable<boolean>): MonoTypeOperatorFunction<T> {
const gateTriggerFn = () => gateTrigger.pipe(
filter((v) => v)
);
return (source: Observable<T | null | undefined>) => source.pipe(
delayWhen(gateTriggerFn)
);
}

到目前为止,这个解决方案似乎正在做我想做的事情。

最新更新