基于流获取数据



我们有一个通知数据更改的源,当有项进来时,我们异步获取新数据。

source.SelectMany(async n => { await FetchData()});

在等待加载数据时,可能会有许多通知进来,但我们想要忽略除1之外的所有通知,这样我们就不必为每个通知获取数据,而只需再读取一次。
在获取数据之前,我们如何忽略除1之外的所有来自source的通知?

我有一种感觉,解决方案将涉及将FetchData()转换为IObservable,但我仍然不知道什么Rx原语将允许我们组合流

看起来像一个非常经典的(但缺失的)Rx操作符:ObserveLatestOn(示例实现在这里,但你可以在网上找到其他)。

source.ObserveLatestOn(TimeSpan.Zero, Schedulers.NewThread).SelectMany(async n => { await FetchData()})

请注意,此实现仅在单线程调度器上进行了测试(主要是UI,但将与NewThread一起工作),而不是与Immediate/CurrentThread(可能工作)或TaskPool(可能有竞争条件)

还要注意,你在这里击中的是Rx中缺乏反应性拉背压。RxJava对这种情况有很好的回压支持(例如onBackpressureLatest)

我确信有一种方法可以用Rx做到这一点,但我想到的一个简单的解决方案是使用AsyncAutoResetEvent (AutoResetEvent的异步版本)。

基本上,您创建了一个异步等待AsyncAutoResetEvent设置的循环,该设置在接收到新通知时完成。自动重置确保在下一次等待时,您将被异步阻塞,直到收到新通知。

你可以在Stephen Cleary AsyncEx作为Nuget包创建的优秀库中找到AsyncAutoResetEvent类。

下面是一个简单的程序,展示了建议的解决方案:

class Program
{
    static readonly AsyncAutoResetEvent _resetEvent = new AsyncAutoResetEvent(); 
    static void Main(string[] args)
    {
        // Start the asynchronous fetching loop...
        RunAsync();
        Task.Run(async () =>
        {
            // Simulate fast notifications 
            for (int i = 0; i < 15; i++)
            {
                OnNotification(i);
                await Task.Delay(100);
            }
            // Simulate a pause of notifications 
            await Task.Delay(2000);
            // Simulate fast notifications 
            for (int i = 0; i < 15; i++)
            {
                OnNotification(i);
                await Task.Delay(100);
            }
        });
        Console.ReadKey();
    }
    static void OnNotification(int index)
    {
        Console.WriteLine(DateTime.Now.ToLongTimeString() + " OnNotification " + index);
        // This will unlock the current or next WaitAsync on the _resetEvent
        _resetEvent.Set();
    }

    static async Task RunAsync()
    {
        // Uncomment this if you want to wait for a first notification before fetching.
        // await _resetEvent.WaitAsync();    
        while (true)
        {
            Console.WriteLine(DateTime.Now.ToLongTimeString() + " Fetching...");
            // Simulate long fetching
            await Task.Delay(1000);
            // Wait for a new notification before doing another fetch
            await _resetEvent.WaitAsync();    
        }
    }
}

输出如下:

12:04:51 PM Fetching...
12:04:51 PM OnNotification 0
12:04:52 PM OnNotification 1
12:04:52 PM OnNotification 2
12:04:52 PM OnNotification 3
12:04:52 PM OnNotification 4
12:04:52 PM OnNotification 5
12:04:52 PM OnNotification 6
12:04:52 PM OnNotification 7
12:04:52 PM OnNotification 8
12:04:52 PM OnNotification 9
12:04:52 PM Fetching...
12:04:53 PM OnNotification 10
12:04:53 PM OnNotification 11
12:04:53 PM OnNotification 12
12:04:53 PM OnNotification 13
12:04:53 PM OnNotification 14
12:04:53 PM Fetching...
12:04:55 PM OnNotification 0
12:04:55 PM Fetching...
12:04:55 PM OnNotification 1
12:04:55 PM OnNotification 2
12:04:55 PM OnNotification 3
12:04:56 PM OnNotification 4
12:04:56 PM OnNotification 5
12:04:56 PM OnNotification 6
12:04:56 PM OnNotification 7
12:04:56 PM OnNotification 8
12:04:56 PM OnNotification 9
12:04:56 PM Fetching...
12:04:56 PM OnNotification 10
12:04:56 PM OnNotification 11
12:04:56 PM OnNotification 12
12:04:57 PM OnNotification 13
12:04:57 PM OnNotification 14
12:04:57 PM Fetching...

相关内容

  • 没有找到相关文章