关于Reactive Extensions的大多数示例代码都围绕着如何在序列上组成逻辑和运算符展开。
围绕Observable生成的部分主要围绕"FromEventPatter"、"FromAsynch"等
IObservable<string> observableHotStatus = ??;
foreach (var task in todo)
{
//Process task;
//Post status message into observable; How do I do this?
}
简而言之,我想要一个可以发布到的对象,比如ActionBlock、Action(of T)或类似的东西。实现这一点最简单的方法是什么?
编辑:
仔细检查您的代码,我建议使用Observable.Create
。即使它只返回冷可观察,您也可以将Publish
运算符应用于生成的可观察,使其热。
如果task
实际上指的是Task<T>
,那么可以使用Observable.Create
的重载来定义异步迭代器。例如:
IObservable<string> statuses = Observable.Create<string>(
(observer, cancel) =>
{
foreach (var task in todo)
{
cancel.ThrowIfCancellationRequested();
await task;
observer.OnNext("Status");
}
});
以前的回答:
你可以使用以下类型之一,但我建议在做出决定之前先阅读"使用主题"或"不使用主题"。
Subject<T>
:通用,类似"事件",热可观察。调用OnNext
就像引发一个经典的.NET事件BehaviorSubject<T>
:通常用作属性的支持字段,它表示可观察的变化"事件"序列。每当观察者订阅时,它都会立即接收当前值,然后是对属性的所有更改。您可以随时从Value
属性中提取当前值;例如,在您房产的getter内。在属性的setter中调用OnNext
,就不必保留重复的backing字段。这也是Rx的连续函数版本,如果我对FRP的理解正确的话,这是你在Rx中唯一能找到的类似FRP的东西ReplaySubject<T>
:通常用作"事件"的历史缓冲区,它表示一个可观察的值序列,从观察者错过的值开始,每当观察者订阅时。您可以控制缓冲值的距离;这就像一个浏览价值观历史的滑动窗口。你很少需要使用这种类型。在大多数情况下,Observable.Replay
运算符将执行此操作AsyncSubject<T>
:通常用于捕获hot的结果,异步函数如Task<T>
。你很少需要使用这种类型。在大多数情况下,可以使用Observable.FromAsyncPattern
或Task
转换运算符