如何在流 B 触发时抑制流 A 的下一个事件



每当流 B 触发时,我想只停止流 A 的一个通知。两个流都将保持在线状态,并且永远不会完成。

A: o--o--o--o--o--o--o--o--o  
B: --o-----o--------o-------  
R: o-----o-----o--o-----o--o  

A: o--o--o--o--o--o--o--o--o  
B: -oo----oo-------oo-------  
R: o-----o-----o--o-----o--o  
这是我

为类似问题做的SkipWhen运算符的一个版本(不同之处在于,在原版中,多个"B"会跳过多个"A"):

public static IObservable<TSource> SkipWhen<TSource, TOther>(this IObservable<TSource> source, 
    IObservable<TOther> other)
{
    return Observable.Create<TSource>(observer =>
    {
        object lockObject = new object();
        bool shouldSkip = false;
        var otherSubscription = new MutableDisposable();
        var sourceSubscription = new MutableDisposable();
        otherSubscription.Disposable = other.Subscribe(
            x => { lock(lockObject) { shouldSkip = true; } });
        sourceSubscription.Disposable = source.Where(_ =>
        {
            lock(lockObject)
            {
                if (shouldSkip)
                {
                    shouldSkip = false;
                    return false;
                }
                else
                {
                    return true;
                }
            }
        }).Subscribe(observer);
        return new CompositeDisposable(
            sourceSubscription, otherSubscription);
    });
}

如果当前实现成为瓶颈,请考虑将锁定实现更改为使用 ReaderWriterLockSlim

当可观察量很热(并且没有refCount)时,此解决方案将起作用:

streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
  1. .takeUntil(streamB):在流B产生值时使流A完成。
  2. .skip(1):使流A在启动时跳过一个值(或由于.repeat())。
  3. .repeat() :使流A无限期重复(重新连接)。
  4. .merge(streamA.take(1)) :偏移流开始时.skip(1)的效果。

使流每 5 秒跳过一次的示例:

var streamA,
    streamB;
streamA = Rx.Observable
    .interval(1000)
    .map(function (x) {
        return 'A:' + x;
}).publish();
streamB = Rx.Observable
    .interval(5000);
streamA
    .takeUntil(streamB)
    .skip(1)
    .repeat()
    .merge(streamA.take(1))
    .subscribe(console.log);
streamA.connect();

您还可以使用此沙盒 http://jsbin.com/gijorid/4/edit?js,console 在运行代码时在控制台日志中执行BACTION(),以手动将值推送到streamB(这有助于分析代码)。

相关内容

  • 没有找到相关文章

最新更新