我目前正在用rxjs自学响应式编程,我给自己设定了一个挑战,创建一个可观察流,无论如何都会向订阅者发出相同的结果。
我已经记住了给定特定URL的HTTP"GET"流的创建,并且我试图每隔两秒钟对该流进行操作,其结果是对于计时器的每一次滴答,我将从原始流中提取缓存/记忆的HTTP结果。
import superagent from 'superagent';
import _ from 'lodash';
// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
req = req.end.bind(req);
return Rx.Observable.fromNodeCallback(req)();
});
// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');
// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
.map(() => response$)
.flatMap($ => $)
.subscribe(response => {
console.log('Got the response!');
});
我确信我必须在那里的某个地方坚持对replay()
的调用,但无论我做什么,每两秒钟都会发起一个新的HTTP调用。我如何构建这个,这样我就可以从一个URL构造一个可观察对象,并让它总是发出相同的HTTP结果到任何后续订阅者?
编辑
我找到了一种方法来获得我想要的结果,但我觉得我错过了一些东西,应该能够用一种更精简的方法重构它:
var httpget = _.memoize(function(url) {
var subject = new Rx.ReplaySubject();
try {
superagent.get(url).end((err, res) => {
if(err) {
subject.onError(err);
}
else {
subject.onNext(res);
subject.onCompleted();
}
});
}
catch(e) {
subject.onError(e);
}
return subject.asObservable();
});
你的第一个代码示例实际上更接近于这样做
var httpget = _.memoize(function(url) {
var req = superagent.get(url);
return Rx.Observable.fromNodeCallback(req.end, req)();
});
然而,这不起作用,因为在fromNodeCallback
中似乎有一个错误。至于工作,直到这是固定的,我认为你实际上是在寻找AsyncSubject
而不是ReplaySubject
。后者可以工作,但前者是专门为这种场景设计的(并且没有数组创建的开销和运行时检查缓存过期的开销,如果这对您很重要的话)。
var httpget = _.memoize(function(url) {
var subject = new Rx.AsyncSubject();
var req = superagent.get(url);
Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
return subject.asObservable();
});
最后,尽管map
很欣赏您的想法,但您可以通过使用直接接受Observable
的flatMap
重载来简化timer
代码:
Rx.Observable.timer(0, 2000)
.flatMap($response)
.subscribe(response => {
console.log('Got the response');
});
除非我把你的问题弄错了,Observable.combineLatest
就是为你做的,它缓存你的可观察对象的最后发出的值。
这段代码发送一次请求,然后每200毫秒给出相同的缓存响应:
import reqPromise from 'request-promise';
import {Observable} from 'rx';
let httpGet_ = (url) =>
Observable
.combineLatest(
Observable.interval(200),
reqPromise(url),
(count, response) => response
);
httpGet_('http://google.com/')
.subscribe(
x => console.log(x),
e => console.error(e)
);