如何使用Rx.NET中的管道混合网络和文件系统IO



我有以下要求:

  1. 收集多个远程站点的某些信息
  2. 将信息序列化到磁盘
  3. 联系同一站点,确认数据收集成功。

这是一个非常简化的流程,真正的流程还必须处理故障并具有其他方面,我认为这些方面与我的问题无关,至少目前看来是这样。

无论如何,这是我如何实现所描述的流程:

var data = await GetSitesSource()
    .Select(site => Observable
        .FromAsync(() => GetInformationFromSiteAsync(site))
        .Select(site.MakeKeyValuePair))
    .Merge(maxConcurrentSiteRequests)
    .ToList();
if (data.Count > 0)
{
    var filePath = GetFilePath();
    using (var w = new StreamWriter(filePath))
    {
        await w.WriteAsync(YieldLines(data));
    }
    var tsUTC = DateTime.UtcNow;
    await data.ToObservable()
        .Select(o => Observable.FromAsync(() => AckInformationFromSiteAsync(o.Key, tsUTC, o.Value.InformationId)))
        .Merge(maxConcurrentSiteRequests);
}

地点:

  • MakeKeyValuePair是一个返回KeyValuePair<K,V>实例的扩展方法
  • YieldLinesdata转化为IEnumerable<string>
  • WriteAsync是一个虚构的扩展方法,将一系列字符串写入StreamWriter

这似乎不是一个很好的实现,因为我没有利用这样一个事实,即我可以在第一个Merge运算符出现时开始写出记录。

我可以使用SelectMany + Merge(1)运算符异步地将块写入文件(顺序无关紧要),但是我如何确保各自的StreamWriter仅在需要时初始化并正确处理?因为如果没有数据,我甚至不想初始化StreamWriter .

我的问题-如何重写这段代码,使可观察管道不会在中间中断写入文件?它应该包括所有三个阶段:

  1. 从多个站点获取数据
  2. 逐条写入数据块,顺序无关
  3. 所有数据写入后确认数据

我还没有测试过这个,但是你的代码都不排除将它连接在一起。所以你可以这样做:

//The ToObservable extension for Task is only available through
using System.Reactive.Threading.Tasks;
GetSitesSource()
    .Select(site => Observable
        .FromAsync(() => GetInformationFromSiteAsync(site))
        .Select(site.MakeKeyValuePair))
    .Merge(maxConcurrentSiteRequests)
    .ToList()
    //Only proceed if we received data
    .Where(data => data.Count > 0)
    .SelectMany(data =>
      //Gives the StreamWriter the same lifetime as this Observable once it subscribes
      Observable.Using(
        () => new StreamWriter(GetFilePath()), 
        (w) => w.WriteAsync(YieldLines(data)).ToObservable()),
      //We are interested in the original data value, not the write result
      (data, _) => data)
    //Attach a timestamp of when data passed through here
    .Timestamp()
    .SelectMany(o=> {
      var ts = o.Timestamp;
      var data= o.Value;
      //This is actually returning IEnumerable<IObservable<T>> but merge
      //will implicitly handle it.
      return data.Select(i => Observable.FromAsync(() => 
                               AckInformationFromSiteAsync(i.Key, ts,
                                                           i.Value.InformationId)))
                .Merge(maxConcurrentSiteRequests);
    })
    //Handle the return values, fatal errors and the completion of the stream.
    .Subscribe();

更全面地回答你的问题

Using操作符将一个必须实现IDisposable的资源绑定到Observable的生命周期。第一个参数是一个工厂函数,它会在Observable被订阅时被调用一次。

最新更新