RXJS-控制多个并行执行



我有一个场景,我必须控制多个PTZ摄像机以拍摄每个角度的照片。因此,例如:
Camera A将呈角度 A1A2A3
Camera B将呈角度B1B2B3B4

将相机移动到正确的角度,捕获图像并上传图像是返回承诺的异步功能。
moveCamera( angle )
captureImage()
uploadImage()

摄像机必须并行操作,但是每个相机所取的角度必须按顺序完成。

我以某种方式觉得这可以通过RXJ轻松解决,但是我正在努力将它们拼凑在一起。我能做到的最好的是类似于以下解决方案,以某种方式使摄像机彼此顺序进行。请注意,我使用redux-observable,下面的代码是我在 plain rxjs中的最好的代码。请原谅我的rxjs。

const angles = {
  'Camera A': [ 'A1', 'A2', 'A3' ],
  'Camera B': [ 'B1', 'B2', 'B3', 'B4' ],
}
const cameras = of( [ 'Camera A', 'Camera B' ] );
const cameraRun = cameras.pipe(
  mergeMap( camera => {
    // in redux-observable, I could return an array here
    return of( angles[ camera ] );
  } )
);
cameraRun.pipe(
  concatMap( angle => {
    return moveCamera( angle )
      .then( () => captureImage() )
      .then( () => uploadImage() )
      .then( () => console.log( 'Image success' ) );
  } )
)

对于那些了解redux-observable的人,我有3个史诗:
RUN_CAMERA_SET_ROUTINE-在mergeMap中运行所有摄像机
RUN_CAMERA_ROUTINE-在mergeMap中为每个相机运行所有角度
CAPTURE_IMAGE-在一个concatMap

内运行上方的异步功能

我最初的想法是CAPTURE_IMAGE由于mergeMap产卵流而被"分组",但我错了。看来CAPTURE_IMAGE仍然是所有摄像机的每个角度排队的流。

任何指针都会非常有帮助。

您的问题归结为以顺序执行某些可观察到的物品,并从承诺中创建可观察到的物品。

  1. 并行执行多个可观察物的使用:
  • forkJoin,如果您只希望最终观察到一旦完成所有相机操作,就可以发射
  • merge,如果您希望最终观察到每次单个相机动作成功
  • 时发出
  1. 使用 concat按顺序执行多个可观察结果。

  2. 使用defer从承诺中创建可观察的可观察,但不要立即执行承诺。

您必须

  • 构造要按顺序执行的可观察到的数组。
    (单个相机的每个角度的动作)
  • 构造要并行执行的可观察到的数组。
    (每个相机的相机动作)。

这可能是普通RXJS

中的代码
import { concat, forkJoin, merge, defer } from 'rxjs';
const cameras = ['Camera A', 'Camera B'];
const cameraAngles = { 
  'Camera A': ['A1', 'A2', 'A3'], 
  'Camera B': ['B1', 'B2', 'B3', 'B4'] 
}
// Performs a camera action consisting of multiple parts. Returns a Promise.
// camera: e.g. 'Camera A', angle: e.g. 'A1'
const doCameraAction = (camera, angle) => moveCamera(angle)
  .then(() => captureImage())
  .then(() => uploadImage())
  .then(() => console.log('Image success'));
// Creates an Observables that executes multiple camera actions in sequence.
// camera: e.g. 'Camera A', angles: e.g. ['A1', 'A2', 'A3']
const getCameraActionSequence$ = (camera, angles) => concat(
  // the array of Observables we want to execute in sequence
  ...angles.map(angle => defer(() => doCameraAction(camera, angle)))
);
// An Observable that will execute multiple camera action sequences in in parallel
const multiCameraActions$ = forkJoin(
  // the array of Observables we want to execute in parallel
  cameras.map(camera => getCameraActionSequence$(camera, cameraAngles[camera]))
);

https://stackblitz.com/edit/rxjs-gj1dny?file= index.ts

我会对此进行镜头。我在stackblitz中提出了一个解决方案,以展示我的思想。在单击按钮开始新运行之前,请单击控制台。

有关此解决方案的一些要点:

  • 我仅使用start$用鼠标单击启动新的运行,这对解决方案并不重要。
  • 我嘲笑了三个摄像机的承诺功能。
  • 我还将camera的变量传递给每个相机功能,但这只是Console.log()可以显示什么相机在做什么。
  • 我没有对redux-observable做任何事情,而是将其保留为Vanilla rxjs
  • 我将照片拍摄到使用concat()的可观察到的序列,而不是像您那样将其留下一系列承诺留下 - 这不是必需的,只是另一种方法来接近它。
  • 我将照相机作为单独的可观察物(cameraA$cameraB$)留下,但这也可以使用一系列相机来完成。

可以随意分叉并将其更改为更接近您想要的东西。

这是Stackblitz中的内容:

import { mergeMap, concatMap, tap } from 'rxjs/operators';
import { fromEvent, from, concat, merge, defer } from 'rxjs';
const moveCamera = (camera, angle) => new Promise(
  (resolve, reject) => { 
    setTimeout(() => {
      console.log(`moved: ${camera} angle: ${angle}`);
      resolve();
    }, 1000) }
);
const captureImage = (camera) => new Promise(
  (resolve, reject) => { 
    setTimeout(() => {
      console.log(`${camera} captured image.`);
      resolve();
    }, 100) }
);
const uploadImage = (camera) => new Promise(
  (resolve, reject) => { 
    setTimeout(() => {
      console.log(`${camera} uploaded image.`);
      resolve();
    }, 2000) }
);

const start$ = fromEvent(document.getElementById('start'), 'click');
const takeAPhoto$ = (camera, angle) => concat(
  defer(() => moveCamera(camera, angle)),
  defer(() => captureImage(camera)),
  defer(() => uploadImage(camera))
);
const cameraA$ = from(['A1', 'A2', 'A3']).pipe(
  concatMap(angle => takeAPhoto$('Camera A', angle))
);
const cameraB$ = from(['B1', 'B2', 'B3', 'B4']).pipe(
  concatMap(angle => takeAPhoto$('Camera B', angle))
);
start$.pipe(
  tap(() => console.log('nnstart new run')),
  mergeMap(() => merge(cameraA$, cameraB$)),
).subscribe();

我希望这会有所帮助。

最新更新