我有一个网络服务,我想在布尔值设置为 true 后立即查询更新,然后每 5 分钟重新检查一次。
这是我目前的可观察结果:
_queryDisposable = Observable
.Interval(TimeSpan.FromMinutes(5))
.ObserveOn(Scheduler.ThreadPool)
.Where(i => IsProcessing) // IsProcessing is the bool value
.Subscribe(GetFeeds, OnError, OnComplete);
此可观察量将每 5 分钟检查一次IsProcessing
布尔值是真还是假,然后调用GetFeeds
是否为真。
我能想到获得所需效果的唯一方法是使IsProcessing
成为属性支持的字段并使用 2 个可观察量进行处理,如下所示:
private bool _isProcessing;
public bool IsProcessing
{
get { return _isProcessing; }
set
{
if (_isProcessing == value)
return;
_isProcessing = value;
if (!value)
{
if(_queryDisposable != null)
_queryDisposable.Dispose();
_queryDisposable = null;
}
else
{
Observable
.Range(0,1)
.ObserveOn(Scheduler.ThreadPool)
.Subscribe(GetFeedsSafe, OnError, OnComplete);
_queryDisposable = Observable
.Interval(TimeSpan.FromMinutes(5))
.ObserveOn(Scheduler.ThreadPool)
.Subscribe(GetFeedsSafe, OnError, OnComplete);
}
}
}
我不认为这是一个非常优雅的解决方案,所以我想知道是否有更好的方法来达到这种效果?
我错过了什么,还是您不会在开始可观察之前立即致电订阅自己?
Subscribe(GetFeeds, OnError, OnComplete);
// Or perhaps, if you want it to be async,
new TaskFactory().StartNew(()=>Subscribe(GetFeeds, OnError, OnComplete));
_queryDisposable = Observable
.Interval(TimeSpan.FromMinutes(5))
.ObserveOn(Scheduler.ThreadPool)
.Where(i => IsProcessing) // IsProcessing is the bool value
.Subscribe(GetFeeds, OnError, OnComplete);
编辑:纯Rx,来自 http://social.msdn.microsoft.com/Forums/sa/rx/thread/c4acaf34-3136-4206-a6f9-ef5afba74b2b:
Observable.Interval(TimeSpan.FromMinutes(5))
.StartWith(-1L)
.ObserveOn(Scheduler.ThreadPool)
.Where(i => IsProcessing) // IsProcessing is the bool value
.Subscribe(GetFeeds, OnError, OnComplete);
要立即开始序列并在给定时间内仍然生成值,您有两种选择:
- Observable.Interval(TimeSpan.FromMinutes(5)).起始位置(-1)
- Observable.Timer(TimeSpan.FromMinutes(0), TimeSpan.FromMinutes(5))
接下来,我假设您想在 IsProcessing 值设置为 false 时取消计时器,您还想在序列设置回 true 时重新启动序列。上面的两个答案以 5 分钟的间隔运行,无论何时设置 IsProcessing 值。如果设置 IsProcessing=true,序列将启动,如果一分钟后设置 IsProcessing=false,然后设置 IsProcessing=true,则仍需等待 4 分钟才能再次查询。我想这不是你想要的吗?
因此,我们希望从属性更改中触发此操作。 @SPFiredrake很好地建议了INPC,并将事件转换为可观察序列。周围有很多小片段可以帮助您做到这一点。假设我们使用一个扩展方法PropertyChanges,我们可以编写这个非常简单的查询。
var isProcessingValues = this.PropertyChanges(x=>x.IsProcessing);
isProcessingValues
.Where(isProcessing=>isProcessing)
.SelectMany(_=>
Observable.Timer(TimeSpan.Zero, TimeSpan.FromMinutes(5))
.TakeUntil(isProcessingValues.Where(isProcessing=>!isProcessing))
)
.Select(_=>GetFeeds()) //What does GetFeeds actually return?
.SubscribeOn(Scheduler.NewThread)
.Subscribe(feedData=>Console.WriteLine(feedData),
ex=>Console.WriteLine (ex),
()=>Console.WriteLine (completed));
慢慢地走过这个:
- 当 IsProcessing 属性更改时,将新值推送到序列中 (isProcessingValues)
- 当序列值为 true 时,启动一个计时器序列,该序列立即每 5 分钟生成一个值。
- 如果 isProcessingValues 序列产生的值为 false,则计时器序列应停止
- 每次计时器滴答作响时(当 IsProcess 设置为 true 时,每 5 分钟设置为 true 时),调用 GetFeeds
- 确保我们的序列在不同的线程上订阅(不阻塞)。如果这不是必需的,请删除。
- 从提要中获取数据并对其进行处理。
你有没有想过让 IsProcessing 变量触发一个道具更改事件,然后你可以听?
var processing = Observable
.FromEventPattern<PropertyChangedEventArgs>(this, "PropertyChanged")
.Where(tr => tr.EventArgs.Property == "IsProcessing" && ((Type)tr.Sender).IsProcessing);
_queryDisposable = Observable
.Interval(TimeSpan.FromMinutes(5))
.ObserveOn(Scheduler.ThreadPool)
.And(processing) // Will only fire when both sequences have an available value.
.Subscribe(GetFeeds, OnError, OnComplete);
这也确保了如果它正在处理,它只会触发一次,因为处理可观察一次只提供一个值(对于每个 IsProcessing 更改为 true
),所以你不必担心它再次触发如果它仍然在处理 5 分钟后。