如何有条件地分支rxjs流



我正在尝试模拟任何图像编辑器中的"笔刷"功能。

我有以下流:

  • $pointerDown:指针按下
  • $pointerUp:指针上压
  • $position:电刷的位置
  • $escape:按下退出键

我想做什么

当用户拖动鼠标时,进行临时计算。如果鼠标向上,则提交这些更改。如果按下escape键,则不要提交这些更改。

我目前正在处理的是第一个案例:

$pointerDown.pipe(
r.switchMap(() =>
$position.pipe(
r.throttleTime(150),
r.map(getNodesUnderBrush),
r.tap(prepareChanges),
r.takeUntil($pointerUp),
r.finalize(commitBrushStroke))
)).subscribe()

如何以两种不同的方式结束流?这方面的惯用rxjs是什么?

感谢

关于您的问题,我可以看出随着时间的推移,您需要某种state。这里,你的state是可观察的pointerdown/move/dragging,需要是accumulatedcleared,最后是emitted。当我看到这样的state场景时,我总是喜欢使用扫描运算符:

为了简单的例子,我没有使用您预定义的可观察性。如果您在将特定指针用例调整为类似的用例时遇到问题,我可以尝试更新它,使其更接近您的问题

1.什么可以代表我的状态

这里我使用枚举[status]来稍后对之前发生的事件做出反应,并随着时间的推移对点进行累积[acc]

interface State {
acc: number[],
status: Status
}
enum Status {
init,
move,
finish,
escape
}
const DEFAULT_STATE: State = {
acc: [],
status: Status.init
}

2.编写改变状态的函数

您的需求可以分为:累积[pointerdown$+position$]、完成[pointerrup$]、转义[escape$]

const accumulate = (index: number) => (state: State): State =>
({status: Status.move, acc: [...state.acc, index]});
const finish = () => (state: State): State =>
({status: Status.finish, acc: state.acc})
const escape = () => (state: State): State =>
({status: Status.escape, acc: []})

3.将函数映射到可观测值

merge(
move$.pipe(map(accumulate)),
finish$.pipe(map(finish)),
escape$.pipe(map(escape))
)

4.使用扫描中的功能,将您的状态随时间推移放置

scan((acc: State, fn: (state: State) => State) => fn(acc), DEFAULT_STATE)

5.处理您的突变状态

在这里,我们只想在完成时进行处理,所以我们对其进行过滤

filter(state => state.status === Status.finish),

内部状态样本

  1. move$.next('1'(=状态:{状态:move,acc:〔'1'〕}
  2. escape$.next((=状态:{status:escape,acc:[]}
  3. move$.next('2'(=状态:{status:移动,acc:〔'2'〕}
  4. finish$.next((=状态:{status:finish,acc:〔'2'〕}

运行stackblitz

仅供参考:用rxjs解决状态问题的心态很难实现。但一旦你了解了这个想法背后的流程,你就可以在几乎所有的全州场景中使用它。你将避免副作用,坚持rxjs工作流程,你可以轻松地优化/调试你的代码

非常有趣的问题!

这是我的方法:

const down$ = fromEvent(div, 'mousedown');
const up$ = fromEvent(div, 'mouseup');
const calc$ = of('calculations');
const esc$ = fromEvent(document, 'keyup')
.pipe(
filter((e: KeyboardEvent) => e.code === 'Escape')
);
down$ // The source of the stream - mousedown events
.pipe(
switchMapTo(calc$.pipe( // Do some calculations
switchMapTo(
merge( // `merge()` - want to stop making the calculations when either `up$` or `esc$` emit
up$.pipe(mapTo({ shouldCommit: true })),
esc$.pipe(mapTo({ shouldCommit: false })),
).pipe(first()) // `first()` - either of these 2 can stop the calculations;
)
))
)
.subscribe(console.log)

StackBlitz。

我认为使用toArray()takeUntil()有一种非常简单的方法。我假设,当你说你想要"提交"更改时,你想要收集所有更改并一次性处理它们。否则,同样的方法也适用于buffer()

$pointerDown.pipe(
switchMap(() => $position.pipe(
throttleTime(150),
map(getNodesUnderBrush),
tap(prepareChanges), // ?
takeUntil($pointerUp),
toArray(),
takeUntil($escape),
)
).subscribe(changes => {
...
commitBrushStroke(changes);
})

因此,整个技巧是在toArray之前还是之后完成内链。当您在toArray()之前完成它时,toArray()将发出一个包含迄今为止收集的所有更改的单个数组。如果您在toArray()之后完成,则链将被丢弃,toArray()将丢弃所有内容并取消订阅。

如果我理解正确,当用户按下转义键时,整个流应该被取消订阅,这样,当用户再次开始拖动指针鼠标向下时,流就会再次开始发射。

如果是这种情况,您可能希望尝试switchMap$escapemerge,换句话说,类似于此

const drag$ = $pointerDown.pipe(
r.switchMap(() =>
$position.pipe(
r.throttleTime(150),
r.map(getNodesUnderBrush),
r.tap(prepareChanges),
r.takeUntil($pointerUp),
r.finalize(commitBrushStroke))
))
const brush$ = $escape.pipe(
startWith({}),  // emit any value at start just to allow the stream to start
switchMap(() => drag$)
);
const stopBrush$ = $escape.pipe(tap(() => // do stuff to cancel));
merge(brush$, stopBrush$).subscribe();

整个想法是,无论何时$escape发出,对drag$的前一个订阅都将被取消订阅,并开始新的订阅。同时,可以执行任何逻辑来取消需要取消的内容。

我不能测试这个东西,所以我希望我没有忘记什么。

这里有一个与您的原始解决方案非常相似的其他可能的解决方案:

const start$ = fromEvent(document, "mousedown").pipe(
tap((event: MouseEvent) => this.start = `x: ${event.clientX}, y: ${event.clientY}`)
);
const drag$ = fromEvent(document, "mousemove").pipe(
tap((event: MouseEvent) => (this.move = `x: ${event.clientX}, y: ${event.clientY}`)      )
);
const stop$ = fromEvent(document, "mouseup").pipe(
tap((event: MouseEvent) => (this.stop = `x: ${event.clientX}, y: ${event.clientY}`))
);
const cancel$ = fromEvent(document, "keydown").pipe(
filter((e: KeyboardEvent) => e.code === "Escape"),
tap(() => this.stop = 'CANCELLED')
);
const source$ = start$
.pipe(
mergeMap(() =>
drag$.pipe(
takeUntil(merge(stop$, cancel$))
)
)
)
.subscribe();

流可以以两种方式结束,合并两种takeUntil条件:

takeUntil(merge(stop$, cancel$))

以下是Stacklitz:https://stackblitz.com/edit/angular-gw7gyr

最新更新