在订阅后添加一个可观察序列



我们正在使用Rx来监视silverlight应用程序中的活动,以便我们可以在一段时间不活动后向用户显示消息。

我们正在将事件(鼠标移动等)转换为可观察对象,然后将这些可观察对象合并在一起以创建单个(allActivity)可观察对象。然后,我们使用一个时间跨度来限制allActivity可观察对象,当系统处于非活动状态一段时间后,就会订阅通知。

我如何在订阅后添加一个新的可观察对象/序列(以便订阅在不取消订阅和重新订阅的情况下拾取它)。

。合并多个序列,节流,订阅。现在给这个已订阅的可观察对象添加一个额外的序列。

示例代码:

private IObservable<DateTime> allActivity;
public void CreateActivityObservables(UIElement uiElement)
{
    // Create IObservables of event types we are interested in and project them as DateTimes
    // These are our observables sequences that can push data to subscribers/ observers 
    // NB: These are like IQueryables in the sense that they do not iterate over the sequence just provide an IObservable type
    var mouseMoveActivity = Observable.FromEventPattern<MouseEventHandler, MouseEventArgs>(h => uiElement.MouseMove += h, h => uiElement.MouseMove -= h)
                                      .Select(o => DateTime.Now);
    var mouseLeftButtonActivity = Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => uiElement.MouseLeftButtonDown += h, h => uiElement.MouseLeftButtonDown -= h)
                                            .Select(o => DateTime.Now);
    var mouseRightButtonActivity = Observable.FromEventPattern<MouseButtonEventHandler, MouseButtonEventArgs>(h => uiElement.MouseRightButtonDown += h, h => uiElement.MouseRightButtonDown -= h)
                                             .Select(o => DateTime.Now);
    var mouseWheelActivity = Observable.FromEventPattern<MouseWheelEventHandler, MouseWheelEventArgs>(h => uiElement.MouseWheel += h, h => uiElement.MouseWheel -= h)
                                       .Select(o => DateTime.Now);
    var keyboardActivity = Observable.FromEventPattern<KeyEventHandler, KeyEventArgs>(h => uiElement.KeyDown += h, h => uiElement.KeyDown -= h)
                                     .Select(o => DateTime.Now);
    var streetViewContainer = HtmlPage.Document.GetElementById("streetViewContainer");
        var mouseMoveHandler = new EventHandler<HtmlEventArgs>(this.Moo);
        bool b = streetViewContainer.AttachEvent("mousemove", mouseMoveHandler);
    var browserActivity = Observable.FromEventPattern<Landmark.QDesk.ApplicationServices.IdleTimeoutService.MouseMoveHandler, HtmlEventArgs>(h => this.MyMouseMove += h, h => this.MyMouseMove -= h).Select(o => DateTime.Now);
    // Merge the IObservables<DateTime> together into one stream/ sequence
    this.allActivity = mouseMoveActivity.Merge(mouseLeftButtonActivity)
                                        .Merge(mouseRightButtonActivity)
                                        .Merge(mouseWheelActivity)
                                        .Merge(keyboardActivity)
                                        .Merge(browserActivity);
}
public IDisposable Subscribe(TimeSpan timeSpan, Action<DateTime> timeoutAction)
{
    IObservable<DateTime> timeoutNotification = this.allActivity.Merge   (IdleTimeoutService.GetDateTimeNowObservable())
                                                                .Throttle(timeSpan)
                                                                    .ObserveOn(Scheduler.ThreadPool);
    return timeoutNotification.Subscribe(timeoutAction);
}

有一个要合并的重载,它接受IObservable>使外部序列为Subject>当你想要添加另一个源时,调用OnNext。Merge操作符将接收源并订阅它:

var xss = new Subject<IObservable<int>>();
xss.Merge().Subscribe(x => Console.WriteLine(x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => 23 + 8 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.8)).Select(x => 17 + 3 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.3)).Select(x => 31 + 2 * (int)x));
...

要做到这一点,最简单的方法是使用中间主题来代替Merge调用。

Subject<DateTime> allActivities = new Subject<DateTime>();
var activitySubscriptions = new CompositeDisposable();
activitySubscriptions.Add(mouseMoveActivity.Subscribe(allActivities));
activitySubscriptions.Add(mouseLeftButtonActivity.Subscribe(allActivities));
//etc ...
//subscribe to activities
allActivities.Throttle(timeSpan)
             .Subscribe(timeoutAction);
//later add another
activitySubscriptions.Add(newActivity.Subscribe(allActivities));

如果Subject类接收到任何OnError或OnCompleted,它将停止从它订阅的任何可观察对象传递OnNext(以及进一步的OnError和OnCompleted)事件。

此方法与示例的主要区别在于,它在创建主题时订阅所有事件,而不是在订阅合并的可观察对象时订阅。由于示例中的所有可观察对象都是热的,因此差异不应该是明显的。

相关内容

  • 没有找到相关文章