如何从过程代码中用勺子喂入IOobservable



关于Reactive Extensions的大多数示例代码都围绕着如何在序列上组成逻辑和运算符展开。

围绕Observable生成的部分主要围绕"FromEventPatter"、"FromAsynch"等

IObservable<string> observableHotStatus = ??;
foreach (var task in todo)
{
   //Process task;
   //Post status message into observable; How do I do this?
}

简而言之,我想要一个可以发布到的对象,比如ActionBlock、Action(of T)或类似的东西。实现这一点最简单的方法是什么?

编辑:

仔细检查您的代码,我建议使用Observable.Create。即使它只返回可观察,您也可以将Publish运算符应用于生成的可观察,使其

如果task实际上指的是Task<T>,那么可以使用Observable.Create的重载来定义异步迭代器。例如:

IObservable<string> statuses = Observable.Create<string>(
  (observer, cancel) =>
  {
    foreach (var task in todo)
    {
      cancel.ThrowIfCancellationRequested();
      await task;
      observer.OnNext("Status");
    }
  });

以前的回答:

你可以使用以下类型之一,但我建议在做出决定之前先阅读"使用主题"或"不使用主题"。

  • Subject<T>:通用,类似"事件",热可观察。调用OnNext就像引发一个经典的.NET事件
  • BehaviorSubject<T>:通常用作属性的支持字段,它表示可观察的变化"事件"序列。每当观察者订阅时,它都会立即接收当前值,然后是对属性的所有更改。您可以随时从Value属性中提取当前值;例如,在您房产的getter内。在属性的setter中调用OnNext,就不必保留重复的backing字段。这也是Rx的连续函数版本,如果我对FRP的理解正确的话,这是你在Rx中唯一能找到的类似FRP的东西
  • ReplaySubject<T>:通常用作"事件"的历史缓冲区,它表示一个可观察的值序列,从观察者错过的值开始,每当观察者订阅时。您可以控制缓冲值的距离;这就像一个浏览价值观历史的滑动窗口。你很少需要使用这种类型。在大多数情况下,Observable.Replay运算符将执行此操作
  • AsyncSubject<T>:通常用于捕获hot的结果,异步函数如Task<T>。你很少需要使用这种类型。在大多数情况下,可以使用Observable.FromAsyncPatternTask转换运算符

相关内容

  • 没有找到相关文章

最新更新