我们有一个通知数据更改的源,当有项进来时,我们异步获取新数据。
source.SelectMany(async n => { await FetchData()});
在等待加载数据时,可能会有许多通知进来,但我们想要忽略除1之外的所有通知,这样我们就不必为每个通知获取数据,而只需再读取一次。
在获取数据之前,我们如何忽略除1之外的所有来自source的通知?
我有一种感觉,解决方案将涉及将FetchData()转换为IObservable,但我仍然不知道什么Rx原语将允许我们组合流
看起来像一个非常经典的(但缺失的)Rx操作符:ObserveLatestOn
(示例实现在这里,但你可以在网上找到其他)。
source.ObserveLatestOn(TimeSpan.Zero, Schedulers.NewThread).SelectMany(async n => { await FetchData()})
请注意,此实现仅在单线程调度器上进行了测试(主要是UI,但将与NewThread一起工作),而不是与Immediate
/CurrentThread
(可能工作)或TaskPool
(可能有竞争条件)
还要注意,你在这里击中的是Rx中缺乏反应性拉背压。RxJava对这种情况有很好的回压支持(例如onBackpressureLatest)
我确信有一种方法可以用Rx做到这一点,但我想到的一个简单的解决方案是使用AsyncAutoResetEvent (AutoResetEvent的异步版本)。
基本上,您创建了一个异步等待AsyncAutoResetEvent设置的循环,该设置在接收到新通知时完成。自动重置确保在下一次等待时,您将被异步阻塞,直到收到新通知。
你可以在Stephen Cleary AsyncEx作为Nuget包创建的优秀库中找到AsyncAutoResetEvent类。
下面是一个简单的程序,展示了建议的解决方案:
class Program
{
static readonly AsyncAutoResetEvent _resetEvent = new AsyncAutoResetEvent();
static void Main(string[] args)
{
// Start the asynchronous fetching loop...
RunAsync();
Task.Run(async () =>
{
// Simulate fast notifications
for (int i = 0; i < 15; i++)
{
OnNotification(i);
await Task.Delay(100);
}
// Simulate a pause of notifications
await Task.Delay(2000);
// Simulate fast notifications
for (int i = 0; i < 15; i++)
{
OnNotification(i);
await Task.Delay(100);
}
});
Console.ReadKey();
}
static void OnNotification(int index)
{
Console.WriteLine(DateTime.Now.ToLongTimeString() + " OnNotification " + index);
// This will unlock the current or next WaitAsync on the _resetEvent
_resetEvent.Set();
}
static async Task RunAsync()
{
// Uncomment this if you want to wait for a first notification before fetching.
// await _resetEvent.WaitAsync();
while (true)
{
Console.WriteLine(DateTime.Now.ToLongTimeString() + " Fetching...");
// Simulate long fetching
await Task.Delay(1000);
// Wait for a new notification before doing another fetch
await _resetEvent.WaitAsync();
}
}
}
输出如下:
12:04:51 PM Fetching...
12:04:51 PM OnNotification 0
12:04:52 PM OnNotification 1
12:04:52 PM OnNotification 2
12:04:52 PM OnNotification 3
12:04:52 PM OnNotification 4
12:04:52 PM OnNotification 5
12:04:52 PM OnNotification 6
12:04:52 PM OnNotification 7
12:04:52 PM OnNotification 8
12:04:52 PM OnNotification 9
12:04:52 PM Fetching...
12:04:53 PM OnNotification 10
12:04:53 PM OnNotification 11
12:04:53 PM OnNotification 12
12:04:53 PM OnNotification 13
12:04:53 PM OnNotification 14
12:04:53 PM Fetching...
12:04:55 PM OnNotification 0
12:04:55 PM Fetching...
12:04:55 PM OnNotification 1
12:04:55 PM OnNotification 2
12:04:55 PM OnNotification 3
12:04:56 PM OnNotification 4
12:04:56 PM OnNotification 5
12:04:56 PM OnNotification 6
12:04:56 PM OnNotification 7
12:04:56 PM OnNotification 8
12:04:56 PM OnNotification 9
12:04:56 PM Fetching...
12:04:56 PM OnNotification 10
12:04:56 PM OnNotification 11
12:04:56 PM OnNotification 12
12:04:57 PM OnNotification 13
12:04:57 PM OnNotification 14
12:04:57 PM Fetching...