我只想学习两者以及如何一起使用它们。我知道它们可以相互补充,我只是找不到一个有人真正做到这一点的例子。
让我从一些背景开始。
.NET框架有许多特殊类型-适当的类或接口-Task<T>
、IObservable<T>
、Nullable<T>
、IEnumerable<T>
、Lazy<T>
等-为底层类型T
提供特殊的功能。
TPL使用Task<T>
来表示T
的单个值的异步计算。
Rx使用CCD_ 9来表示CCD_。
正是这两者的"异步计算"方面将TPL和Rx结合在一起。
现在,TPL也使用类型Task
来表示Action
lambda的异步执行,但这可以被认为是Task<T>
的特殊情况,其中T
是void
。非常像c#中的标准方法,返回void
,如下所示:
public void MyMethod() { }
Rx还允许使用称为Unit
的特殊类型的相同特殊情况。
TPL和Rx之间的差异在于返回的值的数量。TPL是一且仅一,而Rx是零或更多。
因此,如果你以一种特殊的方式处理Rx,只处理返回单个值的可观察序列,你可以用类似于TPL的方式进行一些计算。
例如,在TPL中,我可以写:
Task.Factory
.StartNew(() => "Hello")
.ContinueWith(t => Console.WriteLine(t.Result));
在Rx中,等价物为:
Observable
.Start(() => "Hello")
.Subscribe(x => Console.WriteLine(x));
我可以在Rx中更进一步,指定TPL应该用于执行计算,如下所示:
Observable
.Start(() => "Hello", Scheduler.TaskPool)
.Subscribe(x => Console.WriteLine(x));
(默认情况下使用线程池。)
现在我可以做一些"混搭"了。如果我添加对System.Reactive.Threading.Tasks
名称空间的引用,我可以很容易地在任务和可观察对象之间移动。
Task.Factory
.StartNew(() => "Hello")
.ToObservable()
.Subscribe(x => Console.WriteLine(x));
Observable
.Start(() => "Hello")
.ToTask()
.ContinueWith(t => Console.WriteLine(t.Result));
注意CCD_ 19&.ToTask()
调用,结果从一个库翻转到另一个库。
如果我有一个返回多个值的observable,我可以使用observable .ToArray()
扩展方法将多个序列值转换为一个可以转换为任务的单个数组值。像这样:
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(5) // is IObservable<long>
.ToArray()
.ToTask() // is Task<long[]>
.ContinueWith(t => Console.WriteLine(t.Result.Length));
我认为这是对你问题的一个相当基本的回答。这是你所期望的吗?