我有一个来自服务器发送事件的Observable,它使用以下解决方案返回数据和错误允许从EventSource创建Observable(服务器发送事件(:增强
为了解决连接问题,我想分离
const eventStream = new EventSource(evtSourceAdress);
组成一个单独的可观测
如果连接成功,执行进一步的逻辑
使用的溶液
streamHandler(evtSourceAdress: string) {
return new Observable<MessageEvent<any>>(subscriber => {
const eventStream = new EventSource(evtSourceAdress);
eventStream.onmessage = e => subscriber.next(e);
eventStream.onerror = e => subscriber.error(e);
/* add if else clause if error is returned */
return () => {
if (eventStream.readyState === 1) {
/* TODO add reconnect */
eventStream.close()
}
}
})
}
我修改了代码,将eventStream作为可观察
openEventSource(url: string): Observable<any> {
return new Observable<MessageEvent>(subscribe => {
const sse = new EventSource(url);
// return sse.readyState
})
// return new Observable<MessageEvent>(subscriber => {
// const sse = new EventSource(url);
}
this.eventStreamHandlerServ.openEventSource('http://localhost:3000/sse/event').subscribe(e => console.log(e))
它不会返回任何
添加返回语句return sse.readyState
给我Type 'number' is not assignable to type 'TeardownLogic'.
有没有一种方法可以只将事件作为可观察的?
深思熟虑的答案从(服务器发送的(EventSource 创建RxJS Observable
您可以使用retry
重试连接。
openEventSource(url: string): Observable<any> {
return new Observable<any>((subscriber) => {
const eventSource = new EventSource(url);
eventSource.onmessage = (ev) => subscriber.next(ev.data);
eventSource.onerror = (ev) => subscriber.error(ev);
return () => eventSource.close();
});
}
为了引入重试逻辑,您只需使用上面提到的retry
运算符:
this.eventSourceService
.openEventSource(url)
// retry the connection 5 times, with 500 milliseconds delay
.pipe(retry({ count: 5, delay: 500 }))
.subscribe({
next: (data) => {
console.log('data from event source', data);
},
error: (error) => {
console.log('event source has an error', error);
},
});