Rx observable,在某个超时到期时发布一个值



我有一个方法,它返回一个可观察的。这个可观察的应该(如果每件事都正常(每秒发布一个值。我想做的是,如果某个时间过去了,没有输出,让它发布一些自定义警报值。

private IObservable<string> GetStatus()
{
    return statusProvider
                .Subscribe(statusKey)  //returns an IObservable<string>
                .Select(st => st.ToUpper())
                .DistinctUntilChanged()
                .TakeUntil(disposed)
                .Replay(1)
                .RefCount();
}

有没有一种简单的方法可以让我修改上面的内容,这样,如果30秒内没有状态更新,statusProvider就会发布"坏",然后如果之后真的有更新,它会像往常一样发布,计时器会重新启动到30秒?

这里有一种方法。启动一个计时器,该计时器到期后将产生"坏"结果。每次您的statusProvider生成状态时,计时器都会重置。

var statusSignal = statusProvider
            .Subscribe(statusKey)  //returns an IObservable<string>
            .Select(st => st.ToUpper())
            .Publish()
            .RefCount();
// An observable that produces "bad" after a delay and then "hangs indefinately" after that
var badTimer = Observable
    .Return("bad")
    .Delay(TimeSpan.FromSeconds(30))
    .Concat(Observable.Never<string>());
// A repeating badTimer that resets the timer whenever a good signal arrives.
// The "indefinite hang" in badTimer prevents this from restarting the timer as soon
// as it produces a "bad".  Which prevents you from getting a string of "bad" messages
// if the statusProvider is silent for many minutes.
var badSignal = badTimer.TakeUntil(statusSignal).Repeat();
// listen to both good and bad signals.
return Observable
    .Merge(statusSignal, badSignal)
    .DistinctUntilChanged()
    .Replay(1)
    .RefCount();

我认为以下内容应该有效。它使用Throttle,它将等待30秒,直到没有输入后再发送任何内容。然后,你可以将其与你先前存在的来源合并,以获得你想要的行为。

var bad = source
    .Throttle(TimeSpan.FromSeconds(30))
    .Select(_ => "bad");
var merged = source.Merge(bad);

这很容易做到:

    source
        .Select(x =>
            Observable
                .Interval(TimeSpan.FromSeconds(30.0))
                .Select(y => "bad")
                .StartWith(x))
        .Switch();

我已将您的查询简化为source。在每一个值出现时,你都会开始一个可观察的区间,该区间产生";坏的";每30秒,但您使用传入值启动可观测值。那么你只需要.Switch()就可以观察到了。

相关内容

  • 没有找到相关文章