我正在学习响应式扩展,这些天我遇到了这种情况,代码在这里:
class Program
{
private static void Main(string[] args)
{
var ls = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 }.ToObservable();
ls.Select(m => new
{
t = Observable.Start(() =>
{
Thread.Sleep(100);
return new Random().Next(3, 20);
}),
i = m
}).Subscribe(item => item.t.Subscribe(Console.WriteLine));
Task.WaitAll();
Console.WriteLine("all done");
Console.ReadKey();
}
}
它表明在可观察量中有一个 IObservable,我想在所有过程完成后打印"全部完成",但这不起作用。程序启动后立即快速打印"全部完成",不再等待,在我这里的情况下,我应该怎么做才能获得真正的 WaitAll?
这不是Rx的真正工作方式。Task.WaitAll()
和您的 Rx 代码之间没有链接。您甚至不会将任何任务传递给WaitAll()
方法;-)
所以首先,Subscribe
方法是非阻塞的。它只是指出此时我想开始使用此序列,这就是向我发送值/错误/完成通知时要做的事情。
您的嵌套可观察序列是一个相当高级的主题,可以直接进入,但这没关系,我们可以使用它。
class Program
{
private static void Main(string[] args)
{
//Let go, we are not IEnumerable any more :-)
var ls = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9 }.ToObservable();
var subscription = ls.Select(_ =>
Observable.Start(() =>
{
Thread.Sleep(100);
return new Random().Next(3, 20);
})
})
.Merge() //Merge the IO<IO<T>> into Io<T> so we get a single completion.
.Subscribe(
item => item.Subscribe(Console.WriteLine),
()=>Console.WriteLine("all done"));
Console.ReadKey();
subscription.Dispose();
}
}
您可以通过将 Observable.Start
+ Thread.Sleep
替换为 Rx 方法(如 Observable.Timer
或 Scheduler
)来进一步改进代码。
这里要带走的关键是 Rx 是异步的。关键是不要阻止。此代码中唯一阻止的内容是 Thread.Sleep
和 Console.ReadKey()
.理想情况下,如上所述,无论如何您都会更换Thread.Sleep
。
你似乎在这里做了一些"非Rx"编码。您尝试执行的任务实际上非常简单。
首先,您有一些代码在完成一些工作后生成值。我将其重新编码为:
var rnd = new Random();
Func<int> produceValue = () =>
{
Thread.Sleep(100);
return rnd.Next(3, 20);
};
这使它与 Rx 代码分开。作为旁注,我已经将new Random()
声明拉到了函数之外,因为继续实例化新的Random
实例是不正确的 - 你不一定会以这种方式获得随机数。您还应该实例化一次并使用相同的实例。
所以现在生成可观察量的代码很简单:
var query =
from n in Observable.Range(1, 9)
from m in Observable.Start(produceValue)
select m;
订阅它也很容易:
query.Subscribe(
Console.WriteLine,
() => Console.WriteLine("All Done."));
我认为这完全符合您尝试编码的目的,没有任何讨厌的WaitFor
代码。