我有类似的代码。
IPruduceDemUpdates.Subscribe(update => DoUpdate(update));
但我想做的就是这样的事情。
IPruduceDemUpdates.Subscribe(update => if(NoDoUpadteIsRunning) DoUpdate(update));
因此,如果update方法已经在运行,它会忽略传入的更新。此外,它应该始终执行最后一次更新。无论这是流的最后一次更新还是一段时间内的最后一个更新。这里是一个示例时间轴
- 更新1开始
- 忽略更新2
- 忽略更新3
- 忽略更新4
- 更新1已完成
- 更新4开始
- 更新4已完成
编辑
我有跳过的解决方案
IPruduceDemUpdates.Subscribe(update =>
{
if (_task == null || _task.IsCompleted || _task.IsCanceled || _task.IsFaulted)
_task = DoUpdate(update);
});
但我不知道如何确定,最后一次更新会处理。
如果DoUpdate
是同步的(在本例中似乎是同步的),则可以使用Rxx中的BufferIntrospective
。它正是你想要的:
IProduceDemUpdates
.BufferIntrospective()
.Where(items => items.Count > 0) // ignore empty buffers
.Select(items => items[items.Count - 1]) // ignore all but last item in buffer
.Subscribe(DoUpdate);
我参加聚会有点晚了,但这里有一种内置的线性方法。带有TimeSpan.Zero
的Sample
将持续对源进行采样,并始终推送最后一次更新-要实现这一点,您需要给Sample
它自己的线程,以便采样的源可以不间断地运行:
IProduceDemUpdates.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
.Subscribe(DoUpdate);
备注
线程按每个订阅者重用到Sample
,而不是按每个事件生成,因此无需担心创建大量线程。你也可以做.Sample(TimeSpan.Zero, new EventLoopScheduler())
。关键是,订阅服务器在Sample
采样的同一线程上被传递事件,因此Sample
在订阅服务器完成之前不会对下一个事件进行采样。
示例
以下代码:
// emits 0,1,2,3,4,5,6,7,8,9
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
source.Sample(TimeSpan.Zero, NewThreadScheduler.Default).Subscribe(x => {
Console.WriteLine(x);
Task.Delay(TimeSpan.FromSeconds(3)).Wait();
});
通常输出:
0
2
5
9
要说服自己总是收到最后一次更新,请尝试:
// emit 0,1,2
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
source.Sample(TimeSpan.Zero, NewThreadScheduler.Default).Subscribe(x => {
Console.WriteLine(x);
Task.Delay(TimeSpan.FromSeconds(10)).Wait();
});
将输出:
0
2
洞穴
无论您的方法是什么,请注意在连接到Sample
的订户的同步链中完成最慢的操作是多么重要。如果您在下游有异步订阅(如ObserveOn
),其中包含较慢的运算符,那么您只会将背压转移到其他地方。