Angular NGRX RXJS - 订阅其他内部效果的可观察量不起作用



我有一个问题,我有一个服务来管理SSE,我写这个服务:

getAllNotify(url: string): Observable<EventDTO> {
return new Observable(observer => {
const eventSource = this.getEventSource(url);
eventSource.onopen = function () {
console.log('connection for all data is established');
};
eventSource.onmessage = event => {
this.zone.run(() => {
if (eventSource.readyState !== 0) {
observer.next(JSON.parse(event.data));
}
});
};
eventSource.onerror = error => {
this.zone.run(() => {
observer.error(error);
});
};
});
}
private getEventSource(url: string): EventSource {
return new EventSource(url);
}

app-Component中,我调度一个动作来启动SSE连接,该连接由下面的效果侦听。

在这个效果中,我需要从服务器获得响应,之后我需要订阅我的ngrx store中的其他可观察对象。

我正试图用withLatestFrom操作符来做,但它只适用于第一次;服务器继续发送数据,服务工作,但我没有响应的效果。如果我删除withLatestFrom操作符,我解决了所有问题,它可以正常工作,但我需要订阅其他可观察对象在函数内做一些检查。我错了吗?我不能在效果中使用其他RXJS操作符?

loadEVents3$ = createEffect(() =>
this.actions$.pipe(
ofType('[Websocketevent] Load Websocketevents'),
mergeMap(() =>
this.webSocketEventsService.getAllNotify(`${environment.SERVER_API_URL}/services/my-api-events/api/event`).pipe(
map(res => {
this.eventDto = res;
}),
withLatestFrom(
this.idAccountSelector$,
this.idCompanySelector$,
),
map(res => {
let idAccount = res[0];
let idCompany = res[1];
if (this.eventDto.eventType === EventType.FOLDER_NOTE) {
if (this.eventDto.operationType === OperationType.CREATE) {
return { type: noteEventsActions.ActionNotesType.LOAD_WEB_SOCKET_EVENTS_NOTES, payload: this.eventDto };
} else if (this.eventDto.operationType === OperationType.UPDATE) {
return { type: noteEventsActions.ActionNotesType.UPDATE_WEB_SOCKET_EVENTS_NOTES, payload: this.eventDto };
} else if (this.eventDto.operationType === OperationType.DELETE) {
return { type: noteEventsActions.ActionNotesType.DELETE_WEB_SOCKET_EVENTS_NOTES, payload: this.eventDto };
}
}

}),
catchError(() => EMPTY)
)
)
)
);

我正在尝试使用latestfrom操作符,但它只适用于第一次。

我很惊讶你写的东西第一次就起作用了。我认为不应该。

一个小问题:

map(res => {
this.eventDto = res;
}),

对于RxJS来说,

map(_ => {})

没有明确定义的行为。它将(map)每个响应转换为undefined。这可能不是你想要的。也许tap操作符会更好。您可以在不修改流的情况下创建副作用。

地图和下面的水龙头大致相等:

map(res => {
this.eventDto = res;
return res;
})
tap(res => {
this.eventDto = res;
})

第二个映射操作符也不总是有返回结果。你会看到同样的问题。


一个主要问题:

withLatestFrom增加了源流。

考虑这个设置,它与您的

有点相似
interval(7000).pipe(
withLatestFrom(
interval(5000),
interval(3000)
),
map(vs => vs.map(v => v + 1))
).subscribe(([i7s, i5s, i3s]) => {
console.log(`Source Interval has emitted ${i7s} times`);
console.log(`withLatestFrom 5 second Interval has emitted ${i5s} times`);
console.log(`withLatestFrom 3 second Interval has emitted ${i3s} times`);
});

注意map和subscribe是如何得到一个三值数组的吗?

您的代码将错误的变量赋值给结果的错误部分。let idAccount = res[0];应该总是未定义的。这应该会抛出一个错误然后你会忽略这个错误整个过程会停止

你看到的可能是什么?很难说。

走向解决方案:

你没有给我们一个最小的工作示例,所以我提出的任何解决方案(使用你的代码)都是一个我无法真正测试的解决方案。即便如此!也许可以试试这样做:

loadEVents3$ = createEffect(() =>
this.actions$.pipe(
ofType('[Websocketevent] Load Websocketevents'),
mergeMap(() => this.webSocketEventsService.getAllNotify(
`${environment.SERVER_API_URL}/services/my-api-events/api/event`
).pipe(
tap(res => this.eventDto = res),
filter(eventDto => eventDto.eventType === EventType.FOLDER_NOTE),
withLatestFrom(
this.idAccountSelector$,
this.idCompanySelector$,
),
map(([eventDto, idAccount, idCompany]) => {
// Because of the filter above, we KNOW that
// eventDto.eventType === EventType.FOLDER_NOTE is true here.
let type = null;
if (eventDto.operationType === OperationType.CREATE) {
type = noteEventsActions.ActionNotesType.LOAD_WEB_SOCKET_EVENTS_NOTES;
} else if (eventDto.operationType === OperationType.UPDATE) {
type = noteEventsActions.ActionNotesType.UPDATE_WEB_SOCKET_EVENTS_NOTES;
} else if (this.eventDto.operationType === OperationType.DELETE) { 
type = noteEventsActions.ActionNotesType.DELETE_WEB_SOCKET_EVENTS_NOTES;
}
// Looks like you never use idAccount or idCompany???

return {
type,
payload: eventDto 
};
}),
catchError(() => EMPTY)
)
)
)
);

最新更新