如何知道下一个值来自哪个可观测值



假设我有这样的东西:

var observable = observable1
                 .Merge(observable2)
                 .Merge(observable3);
var subscription = observable.Subscribe(ValueHandler);
...
public void ValueHandler(string nextValue)
{
  Console.WriteLine($"Next value {nextValue} produced by /* sourceObservable */");
}

除了将该引用与每个可观察实现内部的值一起添加之外,是否有一种方法可以从产生下一个值的observable1observable2observable3中获得可观察的源?

不,这与Merge的设计目的正好相反。Merge的设计目的是接受多个流并将其视为一个流。如果您想用某种方法将它们分开处理,请使用不同的运算符。


编辑

至于传递源的运算符,简短的答案是否定的。Rx是关于对消息做出反应的,源是无关的。我甚至不确定你是否可以从概念上定义Rx:中的"来源"是什么

var observable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(5);
var observable2 = observable1.Select(a => a.ToString());
var subscription = observable2.Subscribe(s => s.Dump());

订阅源是observable1observable2还是指向系统时钟的某种指针?

如果你想将进入合并的消息分开,那么你可以使用Select,如下所示:

var observable = observable1.Select(o => Tuple.Create("observable1", o))
  .Merge(observable2.Select(o => Tuple.Create("observable2", o)))
  .Merge(observable3.Select(o => Tuple.Create("observable3", o)));

如果这太乱了,那么你可以简单地制作一个扩展方法来清理它。


我还要补充一点,你在回答中发布的代码不是很像Rx。一般指导方针是避免直接实施IObservableSchool可以更简洁地重写如下:

    public class School
    {
        //private Subject<Student> _subject = null;
        private readonly ISubject<Student> _applicationStream = null;
        public static readonly int MaximumNumberOfSeats = 100;
        public string Name { get; set; }
        public School(string name)
            : this(name, new Subject<Student>())
        {
        }
        public School(string name, ISubject<Student> applicationStream )
        {
            Name = name;
            _applicationStream = applicationStream;
        }
        public void AdmitStudent(Student s)
        {
            _applicationStream.OnNext(s);
        }
        public IObservable<Student> ApplicationStream()
        {
            return _applicationStream;
        }
        public IObservable<Student> AcceptedStream()
        {
            return _applicationStream
                .SelectMany(s => s != null ? Observable.Return(s) : Observable.Throw<Student>(new ArgumentNullException("student")))
                .Distinct()
                .Take(MaximumNumberOfSeats);
        }
    }

通过这种方式,你可以订阅所有的应用程序、接受,如果你想的话,还可以订阅拒绝,等等。你的状态也更少(没有List<Student>),理想情况下,你甚至可以删除Subject<Student> applicationStream,并将其变成一个Observable,它会在某个地方传递。

不,没有任何现成的东西可以为我们提供关于哪个可观察对象生成了值的信息。

这是因为这是一个实现细节。

如果Rx的设计者必须提供这些附加信息,他们只能通过对TSource泛型类型参数施加某种限制来实现。那就不好了。这是值处理程序了解谁生成了值的唯一方法。

因此,获取这些信息的责任在于开发人员使用Rx的实现。

举个例子,假设你上了一堂School课,这是一堂可以观察到的学生课:

using System;
using System.Collections.Generic;
using System.Reactive.Subjects;

namespace SchoolManagementSystem
{
    public class Student
    {
        public Student(string name)
        {
            Name = name;
        }
        public string Name { get; set; }
    }
    public class School : IObservable<Student>
    {
        private List<Student> _students;
        private Subject<Student> _subject = null;
        public static readonly int MaximumNumberOfSeats = 100;
        public string Name { get; set; }
        public School(string name)
        {
            Name = name;
            _students = new List<Student>();
            _subject = new Subject<Student>();
        }
        public void AdmitStudent(Student student)
        {
            if (student == null)
            {
                var ex = new ArgumentNullException("student");
                _subject.OnError(ex);
                throw ex;
            }
            try
            {
                if (_students.Count == MaximumNumberOfSeats)
                {
                    _subject.OnCompleted();
                    return;
                }
                if (!_students.Contains(student))
                {    
                    _students.Add(student);
                    _subject.OnNext(student);
                }
            }
            catch(Exception ex)
            {
                _subject.OnError(ex);
            }
        }
        public IDisposable Subscribe(IObserver<Student> observer)
        {
            return _subject.Subscribe(observer);
        }
    }
}

客户端代码是这样的:

using SchoolManagementSystem;
using System;
using System.Reactive.Linq;
namespace Client
{
    class Program
    {
        static void Main(string[] args)
        {
            var school1 = new School("School 1");
            var school2 = new School("School 2");
            var school3 = new School("School 3");
            var observable = school1
                .Merge(school2)
                .Merge(school3);
            var subscription = observable
                .Subscribe(PrintStudentAdmittedMessage, PrintNoMoreStudentsCanBeAdmittedMessage);
            school1.FillWithStudents(100);
            school2.FillWithStudents(102);
            school3.FillWithStudents(101);
            Console.WriteLine("Press any key to stop observing and to exit the program.");
            Console.ReadKey();
            subscription.Dispose();
        }
        static void PrintStudentAdmittedMessage(Student student)
        {
            Console.WriteLine($"Student admitted: {student}");
        }
        static void PrintNoMoreStudentsCanBeAdmittedMessage()
        {
            Console.WriteLine("No more students can be admitted.");
        }
    }
}

然后,为了让客户知道学生被哪个学校录取,您必须更改IObservable<TSource>TSource类型。在这种情况下,您必须更改School就是IObservable<Student>这一事实。

然而,改变这一点违背了领域模型的语义。因此,绕过这一点的方法是在学生体内引用School,如下所示:

using System;
using System.Collections.Generic;
namespace SchoolManagementSystem
{
    public class Student
    {
        public Student(string name)
        {
            Name = name;
        }
        // Add this new property so you get this information
        // in the value handler
        public School School { get; set; }
        public string Name { get; set; }
        public override string ToString()
        {
            return string.Format($"({School.Name}: {Name})");
        }
    }
}

然后,您可以更改School类的AdmitStudent方法,以指示学生被哪个学校录取,如下所示:

public void AdmitStudent(Student student)
{
    try
    {
        if (_students.Count == MaximumNumberOfSeats)
        {
            ...
        }
        if (!_students.Contains(student))
        {
            // Add this line to indicate which school
            // the student is being admitted to
            student.School = this;
            _students.Add(student);
            _subject.OnNext(student);
        }
    }
    catch(Exception ex)
    {
        _subject.OnError(ex);
    }
}

相关内容

  • 没有找到相关文章

最新更新