如何在函数式响应式编程中对异步回调进行建模?



据我了解,在FRP(函数式响应式编程)中,我们将系统建模为接收一些输入信号并生成一些输出信号的组件:

,------------.
--- input1$ --> |            | -- output1$ -->
|   System   | -- output2$ -->
--- input2$ --> |            | -- output3$ -->
`------------'

这样,如果我们有多个子系统,只要我们能提供可以管道输入和输出的操作员,我们就可以将它们组合在一起。

现在,我正在构建一个应用程序,它异步处理视频帧。实际的处理逻辑是抽象的,可以作为参数提供。在非FRP思维方式中,我可以将应用程序构建为

new App(async (frame) => {
return await processFrame(frame)
})

App负责与底层视频管道建立通信,并重复获取视频帧,然后将该帧传递给给定的回调,一旦回调解析,App发回处理后的帧。

现在我想以FRP方式对App进行建模,以便可以灵活地设计框架处理。

const processedFrameSubject = new Subject()
const { frame$ } = createApp(processedFrameSubject)
frame$.pipe(
map(toRGB),
mergeMap(processRGBFrame),
map(toYUV)
).subscribe(processedFrameSubject)

好处是它使createApp使用者能够以声明方式定义处理管道。

但是,在createApp中,给定一个processedFrame,我需要推理它与哪个frame有关。由于frame$processedFrameSubject现在是分开的,createApp真的很难推理出processedFrame与哪个frame有关,这在非FRP实施中非常容易,因为frameprocessedFrame处于相同的闭包状态。

在函数式反应式编程中,您将尽可能避免使用副作用,这意味着避免.subscribe(tap(() => subject.next())等。使用 FRP,您的状态会声明它的工作原理和连接方式,但在有人需要它并执行副作用之前,它不会执行。

所以我认为以下 API 仍将被视为 FRP:

function createApp(
processFrame: (frame: Frame) => Observable<ProcessedFrame>
): Observable<void>
const app$ = createApp(frame => of(frame).pipe(
map(toRGB),
mergeMap(processRGBFrame),
map(toYUV)
));
// `app$` is an Observable that can be consumed by composing it to other
// observables, or by "executing the side effect" by .subscribe() on it
// possible implementation of createApp for this API
function createApp(
processFrame: (frame: Frame) => Observable<ProcessedFrame>
) {
return new Observable<void>(() => {
const stopVideoHandler = registerVideoFrameHandler(
(frame: Frame) => firstValueFrom(processFrame(frame))
);
return () => {
// teardown
stopVideoHandler()
}
});
}

值得注意的是,createApp正在返回new Observable。在内部new Observable(我们可以摆脱FRP,因为这是我们与外部各方集成的唯一方法,并且我们编写的所有副作用都不会被调用,直到有人.subscribe()可观察的

。这个 API 很简单,仍然是 FRP,但它有一个限制:processFrame回调只能独立于其他方式处理帧。

如果您需要支持该 API,则需要公开frames$,但同样,这是一个用于createApp的项目函数:

function createApp(
projectFn: (frame$: Observable<Frame>) => Observable<ProcessedFrame>
): Observable<void>
const app$ = createApp(frame$ => frame$.pipe(
map(toRGB),
mergeMap(processRGBFrame),
map(toYUV)
));
// possible declaration of createApp
function createApp(
projectFn: (frame$: Observable<Frame>) => Observable<ProcessedFrame>
) {
return new Observable<void>(() => {
const frame$ = new Subject<Frame>;
const processedFrame$ = connectable(frame$.pipe(projectFn));
const processedSub = processedFrame$.connect();
const stopVideoHandler = registerVideoFrameHandler(
(frame: Frame) => {
// We need to create the promise _before_ we send in the next `frame$`,  in case it's processed synchronously
const resultFrame = firstValueFrom(processedFrame$);
frame$.next(frame);
return resultFrame;
})
);
return () => {
// teardown
stopVideoHandler()
processedSub.unsubscribe();
}
});
}

我猜这里registerVideoFrameHandler会一个接一个地调用函数而不重叠?如果存在重叠,则需要以某种方式跟踪帧号,如果SDK没有为您提供任何选项,请尝试以下操作:

// Assuming `projectFn` will emit frames in order. If not, then the API
// should change to be able to match them
const processedFrame$ = connectable(frame$.pipe(
projectFn,
map((result, index) => ({ result, index }))
));
const processedSub = processedFrame$.connect();
let frameIdx = 0;
const stopVideoHandler = registerVideoFrameHandler(
(frame: Frame) => {
const thisIdx = frameIdx;
frameIdx++;
const resultFrame = firstValueFrom(processedFrame$.pipe(
filter(({ index }) => index === thisIdx),
map(({ result }) => result)
));
frame$.next(frame);
return resultFrame;
})
);

最新更新