如何构建一个在上一个 ajax 承诺解析后等待一段时间的 rx 轮询器



一直在研究一些方法。基本上,我不想要一个从轮询开始每 30 秒启动一次 ajax 的轮询器 - 我想要一个在上一个请求返回后 30 秒启动请求的轮询器。另外,我想围绕失败的指数退避制定一些策略。

这是我到目前为止所拥有的(Rx4):

rx.Observable.create(function(observer) {
    var nextPoll = function(obs) {
      // the action function invoked below is what i'm passing in
      // to my poller service as any function which returns a promise
      // and must be invoked each loop due to fromPromise caching
      rx.Observable.fromPromise(action())
        .map(function (x){ return x.data; })
        .subscribe(function(d) {       
            // pass promise up to parent observable  
            observer.onNext(d);
            // reset interval in case previous call was an error
            interval = initInterval;   
            setTimeout(function(){ nextPoll(obs); }, interval);
        }, function(e) {
          // push interval higher (exponential backoff)
          interval = interval < maxInterval ? interval * 2 : maxInterval;
          setTimeout(function(){ nextPoll(obs); }, interval);
        });
    };
    nextPoll(observer);
});

在大多数情况下,这符合我的意愿。我不喜欢使用 setTimeout,但我似乎找不到更好的可观察方法(除了另一个订阅的一次性间隔/计时器)。

我无法解决的另一件事是控制轮询器在最初启动时是否可以延迟启动或立即触发的能力。对于某些用途,我将在开始轮询之前获取数据,因此我可以让它在第一次触发之前等待间隔。到目前为止,我只对发生在第一个 ajax 之前或 ajax 之间的计时器/延迟很幸运,并将其提供给订阅者,这对我不起作用。

将不胜感激任何关于清理它的想法,无论是一般还是在摆脱设置超时方面。而且,如果有人有办法以可选的延迟启动此轮询器,那将是巨大的!谢谢大家!!

更新:终于按照我设想的方式工作了。如下所示:

function computeInterval(error) {
  if (error) {
    // double until maximum interval on errors
    interval = interval < maxInterval ? interval * 2 : maxInterval;
  } else {
    // anytime the poller succeeds, make sure we've reset to
    // default interval.. this also allows the initInterval to 
    // change while the poller is running
    interval = initInterval;
  }
  return interval;
}
poller$ = rx.Observable.fromPromise(function(){ return _this.action(); })
  .retryWhen(function(errors){
    return errors.scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval(true));
      });
  })
  .repeatWhen(function(notification){
    return notification
      .scan(function(acc, x) { return acc + x; }, 0)
      .flatMap(function(x){ 
        return rx.Observable.timer(computeInterval());
      });
  });

只是给了它一些快速的想法,所以它必须进行测试,但希望它能让你走上一个有价值的轨道:

var action; // your action function
Rx.Observable.create(function (observer) {
    function executeAction(action) {
        return Rx.Observable.fromPromise(action()).materialize();
    }
    function computeDelay(){
        // put your exponential delaying logic here
    }
    executeAction()
        .expand(function (x) {
            return Rx.Observable.return({})
                .delay(computeDelay())
                .flatMap(function(){return executeAction(action);})
        })
        .subscribe(function(notification){
             if (notification.kind === "N") {
               observer.onNext(notification.value.data);
             } else if (notification.kind === "E") {
               console.log("error:", notification.error.message);
             }
        });
});

简而言之,这个想法是使用 expand 运算符进行循环,使用 delay 运算符进行延迟。查看文档。错误使用 materialize 运算符和通知机制进行管理(这样可以避免在 promise 返回错误的情况下突然终止轮询流)。

我使用另一种可能有用和/或更简单的技术添加另一个答案。它基于repeatWhen运算符。正如文档所解释的那样,它允许您重复订阅可观察量,直到您发出重复结束的信号,指示正常完成或错误。

var action; // your action function
Rx.Observable.create(function (observer) {
    function executeAction(action) {
        return Rx.Observable.fromPromise(action());
    }
    function computeDelay(){
        // put your exponential delaying logic here
    }
executeAction()
  .repeatWhen(function(notification){
    return Rx.Observable.return({}).delay(computeDelay());
  })
  .subscribe(function(x){
    observer.onNext(x.data);
  })
});

这里有一篇很棒的文章解释了repeatWhenretryWhen比官方文档更好(它适用于 RxJava,但它适用于 Rxjs,只需稍作修改)。它还给出了您想要实现的目标(指数延迟重复)的示例。

最新更新