我有一个从常规.NET事件生成的Observable。Observable是热的,而不是热的,因为它甚至在任何订阅之前就开始产生价值,每次有人订阅它都会收到最新产生的价值。让我们把这个命名为eventStream
。
然后我有另一个Observable,由另一个类公开,它表示一些状态流,所以每个新值都给出该类管理的东西的当前状态。这个Observable也很热。让我们把它命名为stateStream
。
每次事件序列产生一个新值时,我都想选择(我会说sample,但这可能会导致混淆)状态序列提供的最新值。这应该会产生一个新的序列,将两个值组合起来,然后处理它们,等等。
这就是我想到的,但它似乎不起作用:
var eventStream = Observable.FromEventPattern<MyEventArgs>(/*...*/);
var stateStream = someDependency.SomeStateStream;
eventStream.Select(eventValue =>
stateStream
.Take(1)
.Select(stateValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => /* do something */);
这背后的原理来自我处理过的其他类似场景,在这些场景中,某个源产生的新值会导致运行新的订阅,因此返回新的可观测值,最后使用Switch()
或一些类似的运算符将IObservable<IObservable<...>>
压缩为一维可观测的值
但在这种情况下,通过快速测试,似乎没有新的订阅,只生成第一个stateStream
值。相反,我希望每次eventStream
触发时都选择第一个值(Take(1)
)。
AFAIK、CombineLatest
和Zip
不能满足要求:每当两个序列中的一个提供新值时,CombineLatest
就会激发;每当两个序列都有新的值可用时,Zip
就会触发,这意味着当两个序列中最慢的一个有值时。由于与CCD_ 13相同的原因,CCD_。
我还检查了SO线程将一个可观察到的与另一个可观测到的最新组合,但我认为这不适用于这里。我只在一条评论中读到
[…],然后扫描就像CombineLatest一样,只从一侧过滤通知
不知怎么的,这听起来很熟悉,但我无法理解。
我想你想要Observable.Sample()
stateSource.Sample(eventSource)
.Zip(eventSource,...)
在我看来,您当前的解决方案实际上非常接近,但听起来您只需要交换eventStream
&CCD_ 15可观测到周围的CCD_。
试试这个:
stateStream
.Select(stateValue =>
eventStream
.Select(eventValue => new { Event = eventValue, State = stateValue }))
.Switch()
.Do(value => _logger.Trace("{{ {0}, {1} }}", value.Event, value.State))
.Subscribe(value => { /* do something */ });
根据stateStream
的配置方式,您可能需要向其添加.StartWith(...)
,以从eventStream
中获得初始值,但我认为这种方法可以满足您的要求。
您需要状态流的默认值,以防它从未发出。将此默认值作为参数传递给MostRecent
(我刚刚在这里使用了null
),并使用Zip
的重载,该重载需要IEnumerable
:
eventStream.Zip(
stateStream.MostRecent(null),
(evt,state) => new { Event = evt, State = state })
.Subscribe(/* etc */);
Rx.Net 2.3.0-beta2的最新(测试版)具有WithLatestFrom
:
//Only emits when eventStream emits
eventStream.WithLatestFrom(stateStream.StartWith(defaultState),
(evt, state) => new {Event = evt, State = state})
.Subscribe(/*Do something*/);
如果没有,你可以使用(注意未经测试):来填补它
public static IObservable<TResult> WithLatestFrom<TLeft, TRight, TResult>(
this IObservable<TLeft> source,
IObservable<TRight> other,
Func<TLeft, TRight, TResult> resultSelector)
{
return source.Publish(os =>
other.Select(a => os
.Select(b => resultSelector(b,a)))
.Switch());
}
来源由@JamesWorld提供,如果我没有弄错的话。