每当流 B 触发时,我想只停止流 A 的一个通知。两个流都将保持在线状态,并且永远不会完成。
A: o--o--o--o--o--o--o--o--o
B: --o-----o--------o-------
R: o-----o-----o--o-----o--o
或
A: o--o--o--o--o--o--o--o--o
B: -oo----oo-------oo-------
R: o-----o-----o--o-----o--o
这是我
为类似问题做的SkipWhen
运算符的一个版本(不同之处在于,在原版中,多个"B"会跳过多个"A"):
public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source,
IObservable<TOther> other)
{
return Observable.Create<TSource>(observer =>
{
object lockObject = new object();
bool shouldSkip = false;
var otherSubscription = new MutableDisposable();
var sourceSubscription = new MutableDisposable();
otherSubscription.Disposable = other.Subscribe(
x => { lock(lockObject) { shouldSkip = true; } });
sourceSubscription.Disposable = source.Where(_ =>
{
lock(lockObject)
{
if (shouldSkip)
{
shouldSkip = false;
return false;
}
else
{
return true;
}
}
}).Subscribe(observer);
return new CompositeDisposable(
sourceSubscription, otherSubscription);
});
}
如果当前实现成为瓶颈,请考虑将锁定实现更改为使用 ReaderWriterLockSlim
。
当可观察量很热(并且没有refCount
)时,此解决方案将起作用:
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
-
.takeUntil(streamB)
:在流B
产生值时使流A
完成。 -
.skip(1)
:使流A
在启动时跳过一个值(或由于.repeat()
)。 -
.repeat()
:使流A
无限期重复(重新连接)。 -
.merge(streamA.take(1))
:偏移流开始时.skip(1)
的效果。
使流每 5 秒跳过一次的示例:
var streamA,
streamB;
streamA = Rx.Observable
.interval(1000)
.map(function (x) {
return 'A:' + x;
}).publish();
streamB = Rx.Observable
.interval(5000);
streamA
.takeUntil(streamB)
.skip(1)
.repeat()
.merge(streamA.take(1))
.subscribe(console.log);
streamA.connect();
您还可以使用此沙盒 http://jsbin.com/gijorid/4/edit?js,console 在运行代码时在控制台日志中执行BACTION()
,以手动将值推送到streamB
(这有助于分析代码)。