在同一个IObservable订阅中访问IObservable



这是我试图用响应式扩展做的一个简单的例子,但是它不起作用

Add在这个简单的例子中不起作用

    public static void Main(string[] args)
    {
        var list = new List<int> { 1, 2, 3 };
        var obs = list.ToObservable();
        IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread).Subscribe(p =>
        {
            Console.WriteLine(p.ToString());
            Console.WriteLine(Add(obs).ToString());
        },
        err => Console.WriteLine("Error"),
        () => Console.WriteLine("Sequence Completed")
        );
        Console.ReadLine();
        subscription.Dispose();
    }
    private static int Add(IObservable<int> wholeList)
    {
        int sum = 0;
        wholeList.ForEach(i => sum = sum + i);
        return sum;
    }

实际输出
1
_

期望输出值

1
6
2
6
3
6
Sequence Completed
_

。我想在每次迭代中执行一个方法Add(obs),其中obs本身是冷的IObservable正在进行迭代?

修改如下:

IDisposable subscription = obs.SubscribeOn(Scheduler.NewThread)

:

IDisposable subscription = obs.ObserveOn(Scheduler.NewThread)

你应该注意到,就Rx而言,你正在做一件坏事。你在可观察范围内进进出出。你应该尽可能避免这种情况。

所以,例如,避免这样:

    var list = new List<int> { 1, 2, 3 };
    var obs = list.ToObservable();

    var obs = Observable.Range(1, 3);

整个static int Add(IObservable<int> wholeList)方法是不好的。它调用ForEach(这通常应该是一个警告,你正在做错误的事情)从可观察对象中取出值。这就是可能发生死锁的地方。

已经有一个名为Sum的可观察扩展,它返回IObservble<int>,这不会使您退出可观察对象。

所以试着这样写你的代码:
var obs = Observable.Range(1, 3);
var query =
    from n in obs
    from s in obs.Sum()
    select new
    {
        Number = n.ToString(),
        Sum = s.ToString(),
    };
using (var subscription = query.SubscribeOn(Scheduler.NewThread).Subscribe(
    x =>
        {
            Console.WriteLine(x.Number);
            Console.WriteLine(x.Sum);
        },
    err =>
        Console.WriteLine("Error"),
    () =>
        Console.WriteLine("Sequence Completed")))
{
    Console.ReadLine();
}

根据您的评论,我建议您根据需要创建可观察对象来生成项目,而不是在订阅后执行此操作。在你的例子中,你可以这样做:

var list = new List<int> { 1, 2, 3 };
var obs = list.ToObservable().Select(i => new Tuple<int,IObservable<int>>(i,list.ToObservable()));
obs.SubscribeOn(Scheduler.NewThread).Subscribe(t => {
  Console.WriteLine(t.Item1);
  SaveItems(t.Item2);
});

相关内容

  • 没有找到相关文章

最新更新