使用rx.js,如何在计时器上从现有的可观察序列发出记忆结果



我目前正在用rxjs自学响应式编程,我给自己设定了一个挑战,创建一个可观察流,无论如何都会向订阅者发出相同的结果。

我已经记住了给定特定URL的HTTP"GET"流的创建,并且我试图每隔两秒钟对该流进行操作,其结果是对于计时器的每一次滴答,我将从原始流中提取缓存/记忆的HTTP结果。

import superagent from 'superagent';
import _ from 'lodash';
// Cached GET function, returning a stream that emits the HTTP response object
var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  req = req.end.bind(req);
  return Rx.Observable.fromNodeCallback(req)();
});
// Assume this is created externally and I only have access to response$
var response$ = httpget('/ontologies/acl.ttl');
// Every two seconds, emit the memoized HTTP response
Rx.Observable.timer(0, 2000)
  .map(() => response$)
  .flatMap($ => $)
  .subscribe(response => {
    console.log('Got the response!');
  });

我确信我必须在那里的某个地方坚持对replay()的调用,但无论我做什么,每两秒钟都会发起一个新的HTTP调用。我如何构建这个,这样我就可以从一个URL构造一个可观察对象,并让它总是发出相同的HTTP结果到任何后续订阅者?

编辑
我找到了一种方法来获得我想要的结果,但我觉得我错过了一些东西,应该能够用一种更精简的方法重构它:

var httpget = _.memoize(function(url) {
  var subject = new Rx.ReplaySubject();
  try {
    superagent.get(url).end((err, res) => {
      if(err) {
        subject.onError(err);
      }
      else {
        subject.onNext(res);
        subject.onCompleted();
      }
    });
  }
  catch(e) {
    subject.onError(e);
  }
  return subject.asObservable();
});

你的第一个代码示例实际上更接近于这样做

var httpget = _.memoize(function(url) {
  var req = superagent.get(url);
  return Rx.Observable.fromNodeCallback(req.end, req)();
});

然而,这不起作用,因为在fromNodeCallback中似乎有一个错误。至于工作,直到这是固定的,我认为你实际上是在寻找AsyncSubject而不是ReplaySubject。后者可以工作,但前者是专门为这种场景设计的(并且没有数组创建的开销和运行时检查缓存过期的开销,如果这对您很重要的话)。

var httpget = _.memoize(function(url) {
  var subject = new Rx.AsyncSubject();
  var req = superagent.get(url);
  Rx.Observable.fromNodeCallback(req.end, req)().subscribe(subject);
  return subject.asObservable();
});

最后,尽管map很欣赏您的想法,但您可以通过使用直接接受ObservableflatMap重载来简化timer代码:

Rx.Observable.timer(0, 2000)
  .flatMap($response)
  .subscribe(response => {
    console.log('Got the response');
  });

除非我把你的问题弄错了,Observable.combineLatest就是为你做的,它缓存你的可观察对象的最后发出的值。

这段代码发送一次请求,然后每200毫秒给出相同的缓存响应:

import reqPromise from 'request-promise';
import {Observable} from 'rx';
let httpGet_ = (url) =>
      Observable
      .combineLatest(
        Observable.interval(200),
        reqPromise(url),
        (count, response) => response
      );
httpGet_('http://google.com/')
  .subscribe(
    x => console.log(x),
    e => console.error(e)
  );

最新更新