将观察器存储在全局变量中,稍后再使用



如何创建一个可观察量并在代码后面生成下一个值?由于其他异步事件,我希望能够从代码的不同部分调用Next。

这是我尝试过的,它不起作用:

var Rx = require('rx');
var GlobalObserver;
var source = Rx.Observable.create(observer => {
GlobalObserver = observer;
});
var subscription = source.subscribe(
x => console.log('onNext: %s', x),
e => console.log('onError: %s', e),
() => console.log('onCompleted')
);
//...later in the code, as a result of another async event:

GlobalObserver.onNext(someData);
...
GlobalObserver.onNext(someOtherData);

你需要的是某种主题。ReplaySubjectBehaviorSubjectSubject

创建一个主题,然后您可以执行subject.subscribe(...)来订阅它。您还可以执行subject.onNext(...)添加到流中。

例如:

var subject = new Rx.Subject();
var subscription = subject.subscribe(
function (x) { console.log('onNext: ' + x); },
function (e) { console.log('onError: ' + e.message); },
function () { console.log('onCompleted'); }
);
subject.onNext(1);
// => onNext: 1
subject.onNext(2);
// => onNext: 2
subject.onCompleted();
// => onCompleted
subscription.dispose();

一个更具体的用例(每次返回成功的 HTTP 响应时都会添加到可观察流中(:

var httpResponseStream = new Rx.Subject();
var subscription = httpResponseStream.subscribe(function (response) { 
console.log('HTTP response success: ', response); 
});
makeAJAXCall().then(function (response) {
httpResponseStream.onNext(response);
});

正如另一位用户所说,如果您使用的是 V5,请确保将所有onNext更改为next。如果您使用的是 V4,请坚持使用onNext.

我假设这是因为您使用的是 rxjs 版本 ^5.0.0 并阅读了版本 ^4.0.0 的文档。

对于RxJs版本^5.0.0它应该是observer.next(value)而不是observer.onNext(value).

在这里,您可以找到 RxJs 版本^5.0.0的文档