我正在尝试用 RX.net 制作一个小管道,但我很难弄清楚如何在不将每个进程的代码嵌套在管道中的情况下做到这一点,使代码非常混乱。
这基本上是我想做的
inputString -> toUpper Write -> reverseString -> printOutput
这是我工作的代码,但如果我想向管道添加更多东西,它就不会很漂亮。
var inputObservable = Observable.Return("hello world");
inputObservable.Subscribe(inputString =>
{
var toUpperCaseObservable = Observable.Return(inputString.ToUpper());
toUpperCaseObservable.Subscribe(toUpperCaseInput =>
{
var reverseStringObservable = Observable.Return(new String(toUpperCaseInput.Reverse().ToArray()));
reverseStringObservable.Subscribe(reverseStringInput =>
{
var writeOutputObservable = Observable.Return(reverseStringInput);
writeOutputObservable.Subscribe(input =>
{
Console.WriteLine(input);
Console.ReadLine();
});
});
});
});
你真的不想以这种方式创建 Rx "管道"。像这样的嵌套订阅并不是真正的 Rx 管道。
以下是您在基本 Rx 查询/订阅方法中执行的操作的方法:
var query =
from x in Observable.Return("hello world")
let y = x.ToUpper()
select new String(y.Reverse().ToArray());
query.Subscribe(z =>
{
Console.WriteLine(z);
Console.ReadLine();
});
简单得多,但出于说明目的,您的操作可能很昂贵,因此这可能是更好的方法:
var query =
from x in Observable.Return("hello world")
from y in Observable.Start(() => x.ToUpper())
from z in Observable.Start(() => new String(y.Reverse().ToArray()))
select z;
query.Subscribe(z =>
{
Console.WriteLine(z);
Console.ReadLine();
});
一般规则是,在管道结束之前,当您准备好对管道的结果执行操作时,您不应订阅。如果需要执行计算,则应保留为查询管道的一部分。
这也行得通
var inputObservable = Observable.Return("hello world")
.Select(s=>s.ToUpper())
.Select(upperCased=>new String(upperCased.Reverse().ToArray());
inputObservable.Subscribe(str =>
{
Console.WriteLine(str);
Console.ReadLine();
});