正在加入Rx流



我正在尝试为一个Rx查询建模,这对我来说不是微不足道的:

  • 房间里有男人和女人
  • 他们进出房间,在房间里有时会改变位置
  • 每个男人在特定的时间可以看一个(或零个)女人
  • 每个人都有以下特性:

    class Man
    {
      public const int LookingAtNobody = 0;
      public int Id { get; set; }
      public double Location { get; set; }
      public int LookingAt { get; set; }
    }
    
  • 每个女性都有以下属性:

    class Woman
    {
      public int Id { get; set; }
      public double Location { get; set; }
    }
    
  • 为了代表男性,我们有了IObservable<IObservable<Man>>,为了代表女性,我们有IObservable<IObservable<Woman>>

你将如何使用Rx生成从男性到女性的矢量:IObservable<IObservable<Tuple<double,double>>>

为了提供帮助,这里有一些简单案例的单元测试:

public class Tests : ReactiveTest
{
    [Test]
    public void Puzzle1()
    {
        var scheduler = new TestScheduler();
        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(300));
        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));
        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));
        var results = runQuery(scheduler, women, men);
        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle2()
    {
        var scheduler = new TestScheduler();
        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnCompleted<Man>(400));
        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));
        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));
        var results = runQuery(scheduler, women, men);
        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(350),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle3()
    {
        var scheduler = new TestScheduler();
        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 2.0, LookingAt = Man.LookingAtNobody }),
            OnCompleted<Man>(400));
        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));
        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1));
        var results = runQuery(scheduler, women, men);
        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
    }
    [Test]
    public void Puzzle4()
    {
        var scheduler = new TestScheduler();
        var m1 = scheduler.CreateHotObservable(
            OnNext(100, new Man { Id = 1, Location = 1.0, LookingAt = Man.LookingAtNobody }),
            OnNext(200, new Man { Id = 1, Location = 2.0, LookingAt = 10 }),
            OnNext(300, new Man { Id = 1, Location = 3.0, LookingAt = 20 }),
            OnNext(400, new Man { Id = 1, Location = 4.0, LookingAt = 20 }),
            OnCompleted<Man>(500));
        var w1 = scheduler.CreateHotObservable(
            OnNext(150, new Woman { Id = 10, Location = 10.0 }),
            OnNext(250, new Woman { Id = 10, Location = 20.0 }),
            OnCompleted<Woman>(350));
        var w2 = scheduler.CreateHotObservable(
            OnNext(155, new Woman { Id = 20, Location = 100.0 }),
            OnNext(255, new Woman { Id = 20, Location = 200.0 }),
            OnNext(355, new Woman { Id = 20, Location = 300.0 }),
            OnCompleted<Woman>(455));
        var men = scheduler.CreateHotObservable(OnNext(50, m1));
        var women = scheduler.CreateHotObservable(OnNext(50, w1), OnNext(50, w2));
        var results = runQuery(scheduler, women, men);
        var innerResults = (from msg in results
                            where msg.Value.HasValue
                            select msg.Value.Value).ToArray();
        var expectedVector1 = new[]
                       {
                           OnNext(200, Tuple.Create(2.0, 10.0)),
                           OnNext(250, Tuple.Create(2.0, 20.0)),
                           OnCompleted<Tuple<double,double>>(300),
                       };
        var expectedVector2 = new[]
                       {
                           OnNext(300, Tuple.Create(3.0, 200.0)),
                           OnNext(355, Tuple.Create(3.0, 300.0)),
                           OnNext(400, Tuple.Create(4.0, 300.0)),
                           OnCompleted<Tuple<double,double>>(455),
                       };
        ReactiveAssert.AreElementsEqual(expectedVector1, innerResults[0]);
        ReactiveAssert.AreElementsEqual(expectedVector2, innerResults[1]);
    }
    private static IEnumerable<Recorded<Notification<IList<Recorded<Notification<Tuple<double, double>>>>>>> runQuery(TestScheduler scheduler, IObservable<IObservable<Woman>> women, IObservable<IObservable<Man>> men)
    {
        // assuming nested sequences are hot
        var vectors =
            from manDuration in men
            join womanDuration in women on manDuration equals womanDuration
            select from man in manDuration
                   join woman in womanDuration on manDuration equals womanDuration
                   where man.LookingAt == woman.Id
                   select Tuple.Create(man.Location, woman.Location);
        var query = vectors.Select(vectorDuration =>
        {
            var vectorResults = scheduler.CreateObserver<Tuple<double, double>>();
            vectorDuration.Subscribe(vectorResults);
            return vectorResults.Messages;
        });
        var results = scheduler.Start(() => query, 0, 0, 1000).Messages;
        return results;
    }
}

(注意:这个问题被交叉发布到Rx论坛:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/e73ae4e2-68c3-459a-a5b6-ea957b205abe)

如果我理解正确,目标是创建一个可观察的"跟随可观察",其中"跟随可观测"从男人开始看女人时开始,到男人停止看女人时结束。"跟随可观察"应该由男性和女性最近位置的元组组成。

这里的想法是使用CombineLatest,它将获得两个可观测值,当其中任何一个产生值时,组合子将针对可观测值的两个最近值进行评估,这将在组合的可观测值中产生值。然而,CombineLatest只有在两个可观察性都已完成时才完成。在这种情况下,我们希望在两个源中的任何一个完成时完成可观测。为了做到这一点,我们定义了以下扩展方法(我不认为这样的方法已经存在,但可能有一个更简单的解决方案):

public static IObservable<TSource>
  UntilCompleted<TSource, TWhile>(this IObservable<TSource> source,
                                       IObservable<TWhile> lifetime)
{
  return Observable.Create<TSource>(observer =>
  {
    var subscription = source.Subscribe(observer);
    var limiter = lifetime.Subscribe(next => { }, () =>
    {
      subscription.Dispose();
      observer.OnCompleted();
    });
    return new CompositeDisposable(subscription, limiter);
  });
}

此方法类似于TakeUntil,但它不是等到lifetime生成值,而是等到lifetime完成。我们还可以定义一个简单的扩展方法,它采用满足谓词的第一条条纹:

public static IObservable<TSource>
  Streak<TSource>(this IObservable<TSource> source,
                       Func<TSource, bool> predicate)
{
  return source.SkipWhile(x => !predicate(x)).TakeWhile(predicate);
}

现在,对于最后一个查询,我们使用CombineLatest将所有男性和所有女性组合在一起,并使用UntilCompleted完成早期可观察到的结果。为了得到"跟随可观察到的",我们选择了男人看着女人的条纹。然后我们简单地将其映射到一个位置元组。

var vectors =
  from manDuration in men
  from womanDuration in women
  select manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streak(pair => pair.Man.LookingAt == pair.Woman.Id)
  .Select(pair => Tuple.Create(pair.Man.Location, pair.Woman.Location));

这通过了你的所有测试,但它不能处理这样的场景:男人看着女人10一会儿,然后看着20一会儿,然后再看着10一会儿;仅使用第一个条纹。要观察所有条纹,我们可以使用以下扩展方法,返回可观察到的条纹:

public static IObservable<IObservable<TSource>>
  Streaks<TSource>(this IObservable<TSource> source,
                        Func<TSource, bool> predicate)
{
  return Observable.Create<IObservable<TSource>>(observer =>
  {
    ReplaySubject<TSource> subject = null;
    bool previous = false;
    return source.Subscribe(x =>
    {
      bool current = predicate(x);
      if (!previous && current)
      {
        subject = new ReplaySubject<TSource>();
        observer.OnNext(subject);
      }
      if (previous && !current) subject.OnCompleted();
      if (current) subject.OnNext(x);
      previous = current;
    }, () =>
    {
      if (subject != null) subject.OnCompleted();
      observer.OnCompleted();
    });
  });
}

通过只订阅一次源流,并使用ReplaySubject,该方法既适用于热可观测性,也适用于冷可观测性。现在,对于最后的查询,我们选择所有条纹如下:

var vectors =
  from manDuration in men
  from womanDuration in women
  from streak in manDuration
  .CombineLatest(womanDuration, (m, w) => new { Man = m, Woman = w })
  .UntilCompleted(womanDuration)
  .UntilCompleted(manDuration)
  .Streaks(pair => pair.Man.LookingAt == pair.Woman.Id)
  select streak.Select(pair =>
    Tuple.Create(pair.Man.Location, pair.Woman.Location));

我不确定我是否理解为什么要将男性和女性的位置流建模为IObservable<IObservable<T>>而不仅仅是IObservable<T>,但这可能有效:

public static IObservable<Tuple<double, double>> GetLocationsObservable(IObservable<IObservable<Man>> menObservable, 
                                                                            IObservable<IObservable<Woman>> womenObservable)
{
    return Observable.CombineLatest(
        menObservable.Switch(),
        womenObservable.Switch(),
        (man, woman) => new {man, woman})
            .Where(manAndWoman => manAndWoman.man.LookingAt == manAndWoman.woman.Id)
            .Select(manAndWoman => Tuple.Create(manAndWoman.man.Location, manAndWoman.woman.Location));
}

开关本质上是在按下时"切换"到新的可观察对象,这会使流变平。where和select相当简单。

我偷偷怀疑我对要求有些误解,但我想我会提交我的答案,以防有帮助。

相关内容

  • 没有找到相关文章

最新更新