在调用Action之前,将ngrx/effects中的RXJS操作链接到不同操作的Firebase的策略



我遇到了一种不知道如何处理的情况。这种情况是长时间拉取Firebase,将一些可观察到的数据带回ngrx/effects,然后再次调用Firebase删除一些数据,最后将第一个Firebase返回数据传递给Action,让reducer完成任务。

这是ngrx/效果代码

constructor(private threadsService: ThreadsService, private store: Store<ApplicationState>) {

}

@Effect() newMessages$ = Observable.interval(5000)
.withLatestFrom(this.store.select("uiState"))
.map(([any,uiState]) => uiState)
.do(console.log)
.filter(uiState => uiState.userId)
.switchMap(uiState => this.threadsService.loadNewMessagesForUser(uiState.userId))
.withLatestFrom(this.store.select("uiState"))
.do(console.log)
.switchMap(([messages, uiState]) => this.threadsService.deleteMessagesQueuePerUser(messages, uiState.userId))
.map(messages => new NewMessagesReceivedAction(messages))

第一个this.threadsService.loadNewMessagesForUser(uiState.userId)将获得我最终需要的NewMessagesReceivedAction(messages)的Firebase数据。但在传入消息observable之前,我还需要删除MessagesQueuePerUser,这样下一个间隔将返回空,因为新消息已经被拉入。但上面的代码不起作用,结果很糟糕,错误很多:

错误#1:在筛选器之前没有.do(console.log),此代码为

.filter(uiState => uiState.userId)

将在WebStorm中引发错误-类型"{}"上不存在属性"userId"。)

错误#2:这3行不起作用:

// .withLatestFrom(this.store.select("uiState"))
// .do(console.log)
// .switchMap(([messages, uiState]) => this.threadsService.deleteMessagesQueuePerUser(messages, uiState.userId))

这3行的想法是继续向下传递可观察到的消息,但我也再次需要uiState.userId来删除Firebase中的一些数据:this.threadsService.deleteMessagesQueuePerUser(messages,uiState.uerId)。但第一个Firebase列表中可观察到传递的消息不会向下传递到第二个。这是线程服务代码:

firebaseUpdate(dataToSave) {
const subject = new Subject();
this.sdkDb.update(dataToSave)
.then(
val => {
subject.next(val);
subject.complete();
},
err => {
subject.error(err);
subject.complete();
}
);
return subject.asObservable();
}
loadNewMessagesForUser(uid: string): Observable<Message[]> {
console.log ("We are pulling the server! uid: " + uid);
return this.findMessagesForMessageKeys(this.findMessageKeysPreUserUnread(uid));
}
findMessagesForMessageKeys(messageKeys$:Observable<string[]>): Observable<Message[]> {
return messageKeys$
.map(pspp => pspp.map(messageKey => this.db.object('message/' + messageKey)))
.flatMap(fbojs => Observable.combineLatest(fbojs))
}
deleteMessagesQueuePerUser(messages:Observable<Message[]>, uid:string): Observable<Message[]> {
let dataToSave = {};
dataToSave['MessagesQueuePerUser/' + uid] = null;
this.firebaseUpdate(dataToSave);
return messages;
}
findMessageKeysPreUserUnread(uid: string):Observable<string[]> {
return this.db.list('MessagesQueuePerUser/' + uid)
.map(getKeys => getKeys.map(p => p.$key));
}

有人能给我指一个正确的方向,让我明白如何正确地构建和书写这篇文章吗?

更新以澄清

是的,我有UiState类型。我真正想要的是从这个.threadsService.loadNewMessagesForUser(uiState.userId)中获得NewMessages。在我获得NewMessages(因为这是一个可观察的异步操作)后,我想运行删除操作(更像是副作用中的副作用)。当我触发NewMessagesReceivedAction(messages)操作时,数据应该来自this.threadsService.loadNewMessagesForUser(uiState.userId)。希望这能澄清一下。

我假设您有一个UiState类型。

这是一种方法:

@Effect() newMessages$ = Observable.interval(5000)
.withLatestFrom(this.store.select("uiState"))        
.map(([any,uiState]) => uiState)
.filter(uiState => uiState.userId)
.switchMap(uiState => threadsService.loadNewMessagesForUser(uiState.userId)
.switchMap(newMessages => threadsService.deleteMessagesQueuePerUser(newMessages, uiState.userId)
.map(deletedMessages => newMessages)))
.map(messages => ({ type:'MESSAGES_RECEIVED', payload: messages }));

工作柱塞:https://embed.plnkr.co/BdZOD3XHEXU14GveQUBv/

最新更新