是否有一个运算符像扫描一样工作,但让我返回一个 IObservable<TResult> 而不是 IObservable<TSource>?



在这个纯粹为练习而编的例子中,我想返回的是:

如果两个学生在指定的时间段内加入一所学校,比如说2秒,那么我想要一个数据结构来返回两个学生、他们加入的学校以及他们加入之间的时间间隔。

我一直在思考以下几点:

class Program
{
    static void Main(string[] args)
    {
        ObserveStudentsJoiningWithin(TimeSpan.FromSeconds(2));
    }
    static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
    {
        var school = new School("School 1");
        var admissionObservable =
            Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");
        var observable = admissionObservable.TimeInterval()
            .Scan((current, next) =>
            {
                if (next.Interval - current.Interval <= timeSpan)
                {
                    // But this won't work for me because
                    // this requires me to return a TSource
                    // and not a TResult
                }
            });
        var subscription = observable.Subscribe(TimeIntervalValueHandler);
        school.FillWithStudentsAsync(10, TimeSpan.FromSeconds(3));
        school.FillWithStudentsAsync(8, TimeSpan.FromSeconds(1));
        Console.WriteLine("Press any key to exit the program");
        Console.ReadKey();
        subscription.Dispose();
    }
}

这是域:

using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace SchoolManagementSystem
{
    public class Student
    {
        private static int _studentNumber; 
        public Student(string name)
        {
            Name = name;
        }
        public string Name { get; set; }
        public static Student CreateRandom()
        {
            var name = string.Format($"Student {++_studentNumber}");
            return new Student(name);
        }
        public override string ToString()
        {
            return Name;
        }
    }
    public class School: IEnumerable<Student>
    {
        private List<Student> _students;
        public event StudentAdmitted StudentAdmitted;
        public string Name { get; set; }
        public School(string name)
        {
            Name = name;
            _students = new List<Student>();
        }
        public void AdmitStudent(Student student)
        {
            if (!_students.Contains(student))
            {
                _students.Add(student);
                OnStudentAdmitted(this, student);
            }
        }
        protected virtual void OnStudentAdmitted(School school, Student student)
        {
            var args = new StudentAdmittedEventArgs(school, student);
            StudentAdmitted?.Invoke(this, args);
        }
        public IEnumerator<Student> GetEnumerator()
        {
            return _students.GetEnumerator();
        }
        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }
    }

    public delegate void StudentAdmitted(object sender, StudentAdmittedEventArgs args);
    public class StudentAdmittedEventArgs : EventArgs
    {
        public StudentAdmittedEventArgs(School school, Student student): base()
        {
            School = school;
            Student = student;
        }
        public School School { get; protected set; }
        public Student Student { get; protected set;  }
    }
    public static class Extensions
    {
        public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
        {
            if (school == null)
                throw new ArgumentNullException("school");
            if (howMany < 0)
                throw new ArgumentOutOfRangeException("howMany");
            if (howMany == 1)
            {
                school.AdmitStudent(Student.CreateRandom());
                return;
            }
            for (int i = 0; i < howMany; i++)
            {
                await Task.Delay(gapBetweenEachAdmission);
                school.AdmitStudent(Student.CreateRandom());
            }
        }
    }
}

然而,Scan运算符允许我只返回相同TSource的可观测值。Select在这里也不起作用,因为我不能向前看(我可以用Scan做这件事)并将当前项与下一项一起投影,即使Select允许我将TSource转换为TResult

我在找介于两者之间的东西。

  1. 对于成对比较(原始-将当前项与下一项一起投影),您可以使用Buffer方法构建一个具有对的序列
  2. 为了找出学生加入的时间间隔,使用Timestamp而不是TimeInterval方法可能更有用,因为下面的行是next.Interval - current.Interval <= timeSpan。你真正想要的是pair[1].Timestamp - pair[0].Timestamp <= timeSpan

以下为4对(学生11、学生12)、(学生13、学生14)、(同学15、学生16)、(学员17、学生18)的结果:

var admissionObservable = Observable
        .FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
        .Timestamp()
        .Buffer(2)
        .Where(pair => pair[1].Timestamp - pair[0].Timestamp <= timeSpan)
        .Select(pair => new JoiningData
        {
            Students = Tuple.Create(pair[0].Value.EventArgs.Student, pair[1].Value.EventArgs.Student),
            School = pair[0].Value.EventArgs.School,
            Interval = pair[1].Timestamp - pair[0].Timestamp
        });
  1. 正如@Enigmatity所提到的,最好将每个元素与下一个元素进行比较。因此,我们可以使用Zip方法:

以下是8对(学生10、学生11)(学生11、学生12)、(学生12、学生13)、(同学13、学生14)、(学员14、学生15)、(大学生15、学生16)、(小学生16、学生17)、(中学生17、学生18)的结果:

var admissionObservable = Observable
     .FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted")
     .Timestamp();        
admissionObservable
    .Zip(admissionObservable.Skip(1), (a, b) => Tuple.Create(a,b))        
    .Where(pair => pair.Item2.Timestamp - pair.Item1.Timestamp <= timeSpan)        
    .Select(pair => new JoiningData
    {
        Students = Tuple.Create(pair.Item1.Value.EventArgs.Student, pair.Item2.Value.EventArgs.Student),
        School = pair.Item1.Value.EventArgs.School,
        Interval = pair.Item2.Timestamp - pair.Item1.Timestamp
    });

你能试试这个,看看它是否能满足你的需求吗?

IObservable<EventPattern<StudentAdmittedEventArgs>[]> observable =
    admissionObservable
        .Publish(pxs =>
            pxs
                .Window(pxs, x => Observable.Timer(timeSpan))
                .Select(ys => ys.Take(2)))
        .SelectMany(ys => ys.ToArray())
        .Where(ys => ys.Skip(1).Any());

以下是我所做的:

static void ObserveStudentsJoiningWithin(TimeSpan timeSpan)
{
    var school = new School("School 1");
    var admissionObservable =
        Observable.FromEventPattern<StudentAdmittedEventArgs>(school, "StudentAdmitted");
    var observable = admissionObservable.TimeInterval()
        .Scan<TimeInterval<EventPattern<StudentAdmittedEventArgs>>, StudentPair>(null, (previousPair, current) =>
        {
            Debug.Print(string.Format($"Student joined after {current.Interval.TotalSeconds} seconds, timeSpan = {timeSpan.TotalSeconds} seconds"));
            var pair = new StudentPair();
            if (previousPair == null)
            {
                pair.FirstStudent = null;
                pair.SecondStudent = current.Value.EventArgs.Student;
                pair.IntervalBetweenJoining = current.Interval;
                pair.School = current.Value.EventArgs.School;
                return pair;
            }
            if (current.Interval <= timeSpan)
            {
                pair.FirstStudent = previousPair.SecondStudent;
                pair.SecondStudent = current.Value.EventArgs.Student;
                pair.IntervalBetweenJoining = current.Interval;
                pair.School = current.Value.EventArgs.School;
                return pair;
            }
            else
            {
                return default(StudentPair);
            }
        })
        .Where(p => (p != default(StudentPair)) && (p.FirstStudent != null));
    var subscription = observable.Subscribe(StudentPairValueHandler);
    school.FillWithStudents(4, TimeSpan.FromSeconds(1));
    school.FillWithStudents(2, TimeSpan.FromSeconds(10));
    school.FillWithStudents(3, TimeSpan.FromSeconds(2));
    school.FillWithStudents(2, TimeSpan.FromSeconds(5));
    school.FillWithStudents(5, TimeSpan.FromSeconds(0.6));
    Console.WriteLine("Press any key to exit the program");
    Console.ReadKey();
    subscription.Dispose();
}
static void StudentPairValueHandler(StudentPair pair)
{
    if (pair != null && pair.FirstStudent != null)
    {
        Console.WriteLine($"{pair.SecondStudent.Name} joined {pair.School.Name} {Math.Round(pair.IntervalBetweenJoining.TotalSeconds, 2)} seconds after {pair.FirstStudent.Name}.");
    }
}
...
public class StudentPair
{
    public Student FirstStudent;
    public Student SecondStudent;
    public School School;
    public TimeSpan IntervalBetweenJoining;
}

public static class Extensions
{
    public static void FillWithStudents(this School school, int howMany)
    {
        FillWithStudents(school, howMany, TimeSpan.Zero);
    }
    public static void FillWithStudents(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
    {
        if (school == null)
            throw new ArgumentNullException("school");
        if (howMany < 0)
            throw new ArgumentOutOfRangeException("howMany");
        if (howMany == 1)
        {
            school.AdmitStudent(Student.CreateRandom());
            return;
        }
        for (int i = 0; i < howMany; i++)
        {
            Thread.Sleep((int)gapBetweenEachAdmission.TotalMilliseconds);
            school.AdmitStudent(Student.CreateRandom());
        }
    }
    public async static void FillWithStudentsAsync(this School school, int howMany, TimeSpan gapBetweenEachAdmission)
    {
        if (school == null)
            throw new ArgumentNullException("school");
        if (howMany < 0)
            throw new ArgumentOutOfRangeException("howMany");
        if (howMany == 1)
        {
            school.AdmitStudent(Student.CreateRandom());
            return;
        }
        for (int i = 0; i < howMany; i++)
        {
            await Task.Delay(gapBetweenEachAdmission);
            school.AdmitStudent(Student.CreateRandom());
        }
    }
}

相关内容

  • 没有找到相关文章

最新更新