使用一个主题来传播完全不同的事件流



我需要通过一个主题代理所有不同的事件流。

我想出了这个代码:

var mySubject,
    getObservable;
getObservable = function (subject, eventName) {
    return subject
        .asObservable()
        .filter(function (x) {
            return x.EventName === eventName;
        })
        .flatMap(function (x) {
            if (x.Type === 'onNext') {
                return Rx.Observable.return(x.Data);
            }
            if (x.Type === 'onError') {
                return Rx.Observable.throw(x.Data);
            }
            return Rx.Observable.empty();
        });
};
mySubject = new Rx.Subject();
getObservable(mySubject, 'foo')
    .subscribe(function(x){ 
        console.log('foo onNext ' + x); 
    }, function(x){ 
        console.log('foo onError ' + x); 
    }, function(){ 
        console.log('foo onComplete');
    });
getObservable(mySubject, 'bar')
    .subscribe(function(x){ 
        console.log('bar onNext ' + x); 
    }, function(x){ 
        console.log('bar onError ' + x); 
    }, function(){ 
        console.log('bar onComplete');
    });
mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});
mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});

获得输出:

foo onNext 5
bar onNext 5
bar onError Error message

预期输出:

foo onNext 5  
foo onCompleted
bar onNext 5
bar onError Error message

对于bar事件,这就像一个魅力:onNext将被传播,一旦出现错误,就会调用onError函数,并且事件流结束。然而,我无法让它为onComplete工作。

每当发出完整的通知时,我确实看到Rx.Observable.empty()被调用,但这不会导致调用订阅者onComplete处理程序。相反,它调用其onNext处理程序。

getObservable函数返回一个可观察的对象,该对象订阅了通过subject发送的eventName事件。

let getObservable = function (subject, eventName) {
    return Rx.Observable.create(function (observer) {
        subject
            .asObservable()
            .filter(function(x) {
                return x.EventName === eventName;
            })
            .map(function(x) {
                if (x.Type === 'onNext') {
                    observer.onNext(x.Data);
                }
                if (x.Type === 'onError') {
                    observer.onError(x.Data);
                }
                if (x.Type === 'onCompleted') {
                    observer.onCompleted();
                }
                return x;
            })
            .subscribe();
    });
};

这是一个使用原始问题数据的工作示例:

var mySubject,
    getObservable;
getObservable = function (subject, eventName) {
    return Rx.Observable.create(function (observer) {
        subject
            .asObservable()
            .filter(function(x) {
                return x.EventName === eventName;
            })
            .map(function(x) {
                if (x.Type === 'onNext') {
                    observer.onNext(x.Data);
                }
                if (x.Type === 'onError') {
                    observer.onError(x.Data);
                }
                if (x.Type === 'onCompleted') {
                    observer.onCompleted();
                }
                return x;
            })
            .subscribe();
    });
};
mySubject = new Rx.Subject();
getObservable(mySubject, 'foo')
    .subscribe(function(x){ 
        console.log('SomethingHappened onNext ' + x); 
    }, function(x){ 
        console.log('SomethingHappened onError ' + x); 
    }, function(){ 
        console.log('SomethingHappened onComplete');
    });
getObservable(mySubject, 'bar')
    .subscribe(function(x){ 
        console.log('DataUpdated onNext ' + x); 
    }, function(x){ 
        console.log('DataUpdated onError ' + x); 
    }, function(){ 
        console.log('DataUpdated onComplete');
    });
mySubject.onNext({Type: 'onNext', EventName: 'foo', Data: 5});
mySubject.onNext({Type: 'onCompleted', EventName: 'foo'});
mySubject.onNext({Type: 'onNext', EventName: 'bar', Data: 5});
mySubject.onNext({Type: 'onError', EventName: 'bar', Data: 'Error message'});
<script src='https://rawgit.com/Reactive-Extensions/RxJS/master/dist/rx.all.js'></script>

在.NET Observable.SelectMany中使用Observable.Merge将流合并到一个复合可观察对象中。IMHO可观测。只有当任何合并的可观测值完成时,合并才完成。

例如。http://theburningmonk.com/2010/02/rx-framework-iobservable-merge/

这可能是问题的原因。

相关内容

  • 没有找到相关文章

最新更新