如何使用 rxjs6 轮询作业的结果并在状态完成时终止



>想象一个有三个端点的服务。

  1. 提交作业并取回作业 ID
  2. 提交状态请求并取回状态
  3. 提交结果请求并取回结果

我需要一个解决方案来提交作业并轮询结果,直到作业状态完成。由于状态和结果终结点是独立的,因此我需要确保在确定状态为 DONE 后发出最后一个结果请求。

我尝试使用许多 rxjs 运算符无济于事。我可以轻松进行轮询,但困难的部分是在正确的时间停止轮询。当我使用 takeWhile 或 takeUntill 时,我的轮询在我得到结果之前就停止了。

return submitJob(request).pipe(
  delay(500), // Delay start of poll
  switchMap(job => getResults(job).pipe(
    repeatWhen(c => c.pipe(delay(1000))), // Keep getting results every second
    takeUntil(getStatus(job).pipe(
      filter(status => status.done)
    ))
  )
);

问题是,作业的状态立即完成,结果调用被取消。我的代码从未实际提交结果请求。我需要至少拨打一次电话才能在状态恢复完成后获得结果。

您可以使用计时器运算符来调度请求,然后在收到第一个"完成"状态时停止,请参阅以下示例:

const { of, interval, timer } = rxjs; // = require("rxjs")
const { tap, take, filter, mergeMap } = rxjs.operators; // = require("rxjs/operators")
// simulate API methods
const submitJob = id => of('newjob');
let tries = 0;
function getStatus() {
  tries += 1;
  if (tries === 5) return of('done');
  return of('working...');
}
const getResult = () => of('result');
const job$ = submitJob().pipe(
  tap(e => console.log('started job: ', e)),
  // wait 500ms and then make request every 1000ms
  mergeMap(e => timer(500, 1000).pipe(
    // request status every 1 second
    mergeMap(t => getStatus()),
    tap(e => console.log('status: ', e)),
    // when 1 'done' received - stop polling
    filter(status => status === 'done'),
    take(1)
  )),
  mergeMap(e => getResult())
)
job$.subscribe(e => console.log('completed: ', e));
<script src="https://unpkg.com/rxjs@6.3.3/bundles/rxjs.umd.min.js"></script>

最新更新