当任何值由另一个Observable生成时,选择Observable最新值



我有一个从常规.NET事件生成的Observable。Observable是热的,而不是热的,因为它甚至在任何订阅之前就开始产生价值,每次有人订阅它都会收到最新产生的价值。让我们把这个命名为eventStream

然后我有另一个Observable,由另一个类公开,它表示一些状态流,所以每个新值都给出该类管理的东西的当前状态。这个Observable也很热。让我们把它命名为stateStream

每次事件序列产生一个新值时,我都想选择(我会说sample,但这可能会导致混淆)状态序列提供的最新值。这应该会产生一个新的序列,将两个值组合起来,然后处理它们,等等。

这就是我想到的,但它似乎不起作用:

var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;
eventStream.Select(eventValue => 
  stateStream
    .Take(1)
    .Select(stateValue => new { Event = eventValue, State = stateValue }))
  .Switch()
  .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
  .Subscribe(value => /* do something */);

这背后的原理来自我处理过的其他类似场景,在这些场景中,某个源产生的新值会导致运行新的订阅,因此返回新的可观测值,最后使用Switch()或一些类似的运算符将IObservable<IObservable<...>>压缩为一维可观测的值
但在这种情况下,通过快速测试,似乎没有新的订阅,只生成第一个stateStream值。相反,我希望每次eventStream触发时都选择第一个值(Take(1))。

AFAIK、CombineLatestZip不能满足要求:每当两个序列中的一个提供新值时,CombineLatest就会激发;每当两个序列都有新的值可用时,Zip就会触发,这意味着当两个序列中最慢的一个有值时。由于与CCD_ 13相同的原因,CCD_。

我还检查了SO线程将一个可观察到的与另一个可观测到的最新组合,但我认为这不适用于这里。我只在一条评论中读到

[…],然后扫描就像CombineLatest一样,只从一侧过滤通知

不知怎么的,这听起来很熟悉,但我无法理解。

我想你想要Observable.Sample()

stateSource.Sample(eventSource)
     .Zip(eventSource,...)

在我看来,您当前的解决方案实际上非常接近,但听起来您只需要交换eventStream&CCD_ 15可观测到周围的CCD_。

试试这个:

stateStream
    .Select(stateValue => 
        eventStream
            .Select(eventValue => new { Event = eventValue, State = stateValue }))
    .Switch()
    .Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
    .Subscribe(value => { /* do something */ });

根据stateStream的配置方式,您可能需要向其添加.StartWith(...),以从eventStream中获得初始值,但我认为这种方法可以满足您的要求。

您需要状态流的默认值,以防它从未发出。将此默认值作为参数传递给MostRecent(我刚刚在这里使用了null),并使用Zip的重载,该重载需要IEnumerable:

eventStream.Zip(
    stateStream.MostRecent(null),
    (evt,state) => new { Event = evt, State = state })
.Subscribe(/* etc */);

Rx.Net 2.3.0-beta2的最新(测试版)具有WithLatestFrom:

//Only emits when eventStream emits
eventStream.WithLatestFrom(stateStream.StartWith(defaultState), 
                           (evt, state) => new {Event = evt, State = state})
           .Subscribe(/*Do something*/);

如果没有,你可以使用(注意未经测试):来填补它

public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
    this IObservable<TLeft> source,
    IObservable<TRight> other,
    Func<TLeft, TRight, TResult> resultSelector)
{
    return source.Publish(os =>
        other.Select(a => os
            .Select(b => resultSelector(b,a)))
            .Switch());
}

来源由@JamesWorld提供,如果我没有弄错的话。

相关内容

  • 没有找到相关文章