RxJS操作员延迟处理后续事件



我正在寻找一个RxJS运算符(或运算符的组合),它将允许我实现以下目标:

  1. 可观察对象触发的每个事件都应该按照触发顺序进行处理(不跳过、限制或取消触发任何值)
  2. 我应该能够定义处理2个后续事件之间的设置间隔
  3. 如果当前没有处理以前的事件,则应立即处理下一个事件(无延迟)

我创建了以下示例,希望它能让这一点更清楚:

import { Component, OnInit } from "@angular/core";
import { Subject } from "rxjs";
import { delay } from "rxjs/operators";
@Component({
selector: "app-root",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {
counter = 1;
displayedValue = 1;
valueEmitter = new Subject<number>();
valueEmitted$ = this.valueEmitter.asObservable();
ngOnInit() {
// I want to allow each incremented value to be displayed for at least 2 seconds
this.valueEmitted$.pipe(delay(2000)).subscribe((value) => {
this.displayedValue = value;
});
}
onClick() {
const nextCounter = ++this.counter;
this.counter = nextCounter;
this.valueEmitter.next(nextCounter);
}
}

在这个例子中,我希望counter中增加的每个值都显示为displayedValue至少2秒。

因此,当用户第一次点击onClick时,显示的值应该立即从1变为2,但如果用户继续快速增加counter,则displyedValue应该以延迟的方式跟随,允许每个先前增加的数字在改变之前显示2秒,慢慢赶上当前计数值。

这可以通过RxJs运算符实现吗?

更新

目前,我想出了以下解决方案:

import { Component, OnInit } from "@angular/core";
import { Subject } from "rxjs";
@Component({
selector: "app-root",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"]
})
export class AppComponent implements OnInit {
counter = 1;
displayedValue = 1;
currentDisplayedValueStartTime = 0;
readonly DISPLAY_DURATION = 2000;
valuesToDisplay = [];
displayedValueSwitcherInterval: number | null = null;
valueEmitter = new Subject<number>();
valueEmitted$ = this.valueEmitter.asObservable();
ngOnInit() {
this.valueEmitted$.pipe().subscribe((value) => {
this.handleValueDisplay(value);
});
}
onClick() {
const nextCounter = ++this.counter;
this.counter = nextCounter;
this.valueEmitter.next(nextCounter);
}
handleValueDisplay(value) {
this.valuesToDisplay.push(value);
if (!this.displayedValueSwitcherInterval) {
this.displayedValue = this.valuesToDisplay.shift();
this.displayedValueSwitcherInterval = window.setInterval(() => {
const nextDiplayedValue = this.valuesToDisplay.shift();
if (nextDiplayedValue) {
this.displayedValue = nextDiplayedValue;
} else {
clearInterval(this.displayedValueSwitcherInterval);
this.displayedValueSwitcherInterval = null;
}
}, this.DISPLAY_DURATION);
}
}
}

这不是我想要的,因为它依赖于组件状态和setInterval来管理通知队列。我希望找到一些更干净、更具声明性的东西,它依赖RxJs运算符来处理我的通知流和它们的间隔队列,但至少我得到了完全按照我想要的方式工作的功能。仍然对如何让这件事变得更好持开放态度。

我认为这可能是解决问题的一种方法:

this.valueEmitted$
.pipe(
concatMap(
(v) => concat(
of(v),
timer(2000).pipe(ignoreElements())
)
)
)
.subscribe((value) => {
this.displayedValue = value;
});

因为timer也会发出一些值(0、1、2等等),所以我们使用ignoreElements,因为我们对这些值不感兴趣,并且我们只想从timer得到完整通知,这将指示可以订阅下一个内部可观察对象的时刻。请注意,前面提到的内部可观察对象是通过调用concatMap的回调函数创建的,v是参数。

工作演示


可观察到的每个事件都应该按照触发顺序进行处理(没有跳过、节流、取消任何值)。

这是在concatMap的帮助下确保的。

我应该能够定义处理2个后续事件之间的设置间隔。

我们可以通过:实现这一点

concat(
// First, we send the value.
of(v),
// Then we set up a timer so that in case a new notification is emitted, it will have to
// wait for the timer to end.
timer(2000).pipe(ignoreElements())
)

如果当前没有处理以前的事件,则应立即处理下一个事件(无延迟)。

如果concatMap没有创建活动的内部可观测值,则值v将立即发送给消费者。


这也可以:

this.valueEmitted$
.pipe(
concatMap(
(v) => timer(2000).pipe(
ignoreElements(), startWith(v)
)
)
)
.subscribe((value) => {
this.displayedValue = value;
});

因此您的评论使其更加清晰。

显然,你需要一些状态。因为不能同时显示所有内容,所以用户的屏幕并不是无限的。它不能处理超过。。3..4.5立即通知。

扫描是在rxjs中处理状态的方式,它会发出每个事件。

scan可能包含两个列表:一个用于当前呈现的通知,另一个用于挂起的通知。列表和scan一样可以保持顺序,所以不会违反顺序。

然后你需要能够根据你的事件采取不同的行动。我能想到的最简单的东西是元组(event_type_as_sting_or_number, event_specific_params)。这样,您就能够正确地计算下一个状态(将由scan发出),然后优雅地处理它(就像重新发送通知一样)。

似乎你需要三个事件:

  1. 新的事件来自外层空间
  2. 某些通知已过期,必须从屏幕上删除
  3. 用户确认了一些通知,并没有任何意义保留它

所以整个管道是:merge将所有事件放入一个流中;标记为";通过CCD_ 21;该流上的scan[], []是您的初始状态(没有挂起的通知,也没有活动的通知);if else if else基于您的event_typescan内正确计算下一个状态;获取结果并呈现所有活动通知。


来自评论:

我仍然不知道如何管理此解决方案中的间隔。我的意思是,只有在显示当前活动通知的设定间隔之后,我才能将通知从挂起移动到活动阵列。

基本上有两个选项。由于您不受显示某些通知的给定限制的严格约束,因此您可以简单地使用全局interval来发射,比方说,每个your_limit / 2时间单位。当您将每个通知移动到活动通知列表时,可能会使用简单的时间戳对其进行扩展。下次间隔触发时,您将循环查看该列表,并使用时间增量>= your_limit过滤掉所有通知。这有点不准确,但可以很容易地进行配置和调整,以便对用户透明。我投票支持这种简单的解决方案。

另一种解决方案是,一旦将每个通知放入活动列表,就触发timer。定时器完成后,您发送一个事件(notification_must_die, { notification_id: ... }),您的scan处理该事件;必须准备这样的CCD_ 33不再存在(例如。这很复杂,但要准确得多。

最新更新