Rx .net Subscribe() and EventPattern oddity



我将Rx与使用EventPattern的第三方API结合使用。在这个API中,您在对象上注册事件处理程序,然后在启动事件的对象上调用方法StartWatching()以开始触发。我正在使用Observable.FromEventPattern来桥接Rx世界中的API,但我遇到了非常奇怪的问题,即订阅只有在StartWatching()的调用正确的情况下才能工作。下面是我所看到的缩小的情况。

这项工作:

foreach (var iq in interactionQueues)
{
Observable.FromEventPattern(iq, "TheEvent")
.Subscribe(e => Log.Info("I got called!"), 
e => Log.Info("Error!", e),
() => Console.WriteLine("Seq completed!"));
iq.StartWatching();
}

如果我在不同的循环中调用Subscribe()StartWatching(),它将停止工作:

foreach (var iq in interactionQueues)
Observable.FromEventPattern(iq, "TheEvent")
.Subscribe(e => Log.Info("I got called!"), 
e => Log.Info("Error!", e),
() => Console.WriteLine("Seq completed!"));
foreach (var iq in interactionQueues)
iq.StartWatching();

关于为什么会发生这种情况,我唯一的想法是观察或订阅发生在错误的线程上。我试过将Scheduler.CurrentThreadScheduler.ImmediateSubscribeOnObserveOn一起使用,但没有帮助。还有其他想法吗?我应该试试不同的Scheduler还是转移注意力?

让我们用一个更友好的方法来包装它:

public static TheEventArgs WatchEvent(this InteractionQueue this)
{
var ret = Observable.Create<TheEventArgs>(subj => {
// This entire block gets called every time someone calls Subscribe
var disp = new CompositeDisposable();
// Subscribe to the event
disp.Add(Observable.FromEventPattern(iq, "TheEvent").Subscribe(subj));
// Stop watching when we're done
disp.Add(Disposable.Create(() => iq.StopWatching());
iq.StartWatching();
// This is what to Dispose on Unsubscribe
return disp;
});
// When > 1 person Subscribes, only call the block above (i.e. StartWatching) once
return ret.Multicast(new Subject<TheEventArgs>()).RefCount();
}

相关内容

  • 没有找到相关文章

最新更新