反应式扩展,立即触发 OnNext,然后在间隔内触发



我有一个网络服务,我想在布尔值设置为 true 后立即查询更新,然后每 5 分钟重新检查一次。

这是我目前的可观察结果:

        _queryDisposable = Observable
            .Interval(TimeSpan.FromMinutes(5))
            .ObserveOn(Scheduler.ThreadPool)
            .Where(i => IsProcessing)     // IsProcessing is the bool value
            .Subscribe(GetFeeds, OnError, OnComplete);

此可观察量将每 5 分钟检查一次IsProcessing布尔值是真还是假,然后调用GetFeeds是否为真。

我能想到获得所需效果的唯一方法是使IsProcessing成为属性支持的字段并使用 2 个可观察量进行处理,如下所示:

    private bool _isProcessing;
    public bool IsProcessing
    {
        get { return _isProcessing; }
        set
        {
            if (_isProcessing == value)
                return;
            _isProcessing = value;
            if (!value)
            {
                if(_queryDisposable != null)
                    _queryDisposable.Dispose();
                _queryDisposable = null;
            }
            else
            {
                Observable
                    .Range(0,1)
                    .ObserveOn(Scheduler.ThreadPool)
                    .Subscribe(GetFeedsSafe, OnError, OnComplete);
                _queryDisposable = Observable
                    .Interval(TimeSpan.FromMinutes(5))
                    .ObserveOn(Scheduler.ThreadPool)
                    .Subscribe(GetFeedsSafe, OnError, OnComplete);
            }
        }
    }

我不认为这是一个非常优雅的解决方案,所以我想知道是否有更好的方法来达到这种效果?

我错过了什么,还是您不会在开始可观察之前立即致电订阅自己?

    Subscribe(GetFeeds, OnError, OnComplete);
    // Or perhaps, if you want it to be async,
    new TaskFactory().StartNew(()=>Subscribe(GetFeeds, OnError, OnComplete));
    _queryDisposable = Observable
        .Interval(TimeSpan.FromMinutes(5))
        .ObserveOn(Scheduler.ThreadPool)
        .Where(i => IsProcessing)     // IsProcessing is the bool value
        .Subscribe(GetFeeds, OnError, OnComplete);

编辑:纯Rx,来自 http://social.msdn.microsoft.com/Forums/sa/rx/thread/c4acaf34-3136-4206-a6f9-ef5afba74b2b:

Observable.Interval(TimeSpan.FromMinutes(5))
        .StartWith(-1L)
        .ObserveOn(Scheduler.ThreadPool)
        .Where(i => IsProcessing)     // IsProcessing is the bool value
        .Subscribe(GetFeeds, OnError, OnComplete);

要立即开始序列并在给定时间内仍然生成值,您有两种选择:

  1. Observable.Interval(TimeSpan.FromMinutes(5)).起始位置(-1)
  2. Observable.Timer(TimeSpan.FromMinutes(0), TimeSpan.FromMinutes(5))

接下来,我假设您想在 IsProcessing 值设置为 false 时取消计时器,您还想在序列设置回 true 时重新启动序列。上面的两个答案以 5 分钟的间隔运行,无论何时设置 IsProcessing 值。如果设置 IsProcessing=true,序列将启动,如果一分钟后设置 IsProcessing=false,然后设置 IsProcessing=true,则仍需等待 4 分钟才能再次查询。我想这不是你想要的吗?

因此,我们希望从属性更改中触发此操作。 @SPFiredrake很好地建议了INPC,并将事件转换为可观察序列。周围有很多小片段可以帮助您做到这一点。假设我们使用一个扩展方法PropertyChanges,我们可以编写这个非常简单的查询。

var isProcessingValues = this.PropertyChanges(x=>x.IsProcessing);
isProcessingValues
    .Where(isProcessing=>isProcessing)
    .SelectMany(_=> 
        Observable.Timer(TimeSpan.Zero, TimeSpan.FromMinutes(5))
                  .TakeUntil(isProcessingValues.Where(isProcessing=>!isProcessing))
    )
    .Select(_=>GetFeeds())  //What does GetFeeds actually return?
    .SubscribeOn(Scheduler.NewThread)
    .Subscribe(feedData=>Console.WriteLine(feedData), 
        ex=>Console.WriteLine (ex),
        ()=>Console.WriteLine (completed));

慢慢地走过这个:

  1. 当 IsProcessing 属性更改时,将新值推送到序列中 (isProcessingValues)
  2. 当序列值为 true 时,启动一个计时器序列,该序列立即每 5 分钟生成一个值。
  3. 如果 isProcessingValues 序列产生的值为 false,则计时器序列应停止
  4. 每次计时器滴答作响时(当 IsProcess 设置为 true 时,每 5 分钟设置为 true 时),调用 GetFeeds
  5. 确保我们的序列在不同的线程上订阅(不阻塞)。如果这不是必需的,请删除。
  6. 从提要中获取数据并对其进行处理。

你有没有想过让 IsProcessing 变量触发一个道具更改事件,然后你可以听?

var processing = Observable
    .FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
    .Where(tr => tr.EventArgs.Property == "IsProcessing" && ((Type)tr.Sender).IsProcessing);
_queryDisposable = Observable
    .Interval(TimeSpan.FromMinutes(5))
    .ObserveOn(Scheduler.ThreadPool)
    .And(processing) // Will only fire when both sequences have an available value.
    .Subscribe(GetFeeds, OnError, OnComplete);

这也确保了如果它正在处理,它只会触发一次,因为处理可观察一次只提供一个值(对于每个 IsProcessing 更改为 true),所以你不必担心它再次触发如果它仍然在处理 5 分钟后。

相关内容

  • 没有找到相关文章

最新更新