这是我试图用响应式扩展做的一个简单的例子,但是它不起作用
Add在这个简单的例子中不起作用
public static void Main(string[] args)
{
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable();
IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
{
Console.WriteLine(p.ToString());
Console.WriteLine(Add(obs).ToString());
},
err => Console.WriteLine("Error"),
() => Console.WriteLine("Sequence Completed")
);
Console.ReadLine();
subscription.Dispose();
}
private static int Add(IObservable<int> wholeList)
{
int sum = 0;
wholeList.ForEach(i => sum = sum + i);
return sum;
}
实际输出1
_
期望输出值
1
6
2
6
3
6
Sequence Completed
_
。我想在每次迭代中执行一个方法Add(obs),其中obs本身是冷的IObservable正在进行迭代?
修改如下:
IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)
:
IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)
你应该注意到,就Rx而言,你正在做一件坏事。你在可观察范围内进进出出。你应该尽可能避免这种情况。
所以,例如,避免这样:
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable();
var obs = Observable.Range(1, 3);
整个static int Add(IObservable<int> wholeList)
方法是不好的。它调用ForEach
(这通常应该是一个警告,你正在做错误的事情)从可观察对象中取出值。这就是可能发生死锁的地方。
已经有一个名为Sum
的可观察扩展,它返回IObservble<int>
,这不会使您退出可观察对象。
var obs = Observable.Range(1, 3);
var query =
from n in obs
from s in obs.Sum()
select new
{
Number = n.ToString(),
Sum = s.ToString(),
};
using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
x =>
{
Console.WriteLine(x.Number);
Console.WriteLine(x.Sum);
},
err =>
Console.WriteLine("Error"),
() =>
Console.WriteLine("Sequence Completed")))
{
Console.ReadLine();
}
根据您的评论,我建议您根据需要创建可观察对象来生成项目,而不是在订阅后执行此操作。在你的例子中,你可以这样做:
var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));
obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
Console.WriteLine(t.Item1);
SaveItems(t.Item2);
});