return Eventsource Observable from Server发送的角度事件



我有一个来自服务器发送事件的Observable,它使用以下解决方案返回数据和错误允许从EventSource创建Observable(服务器发送事件(:增强

为了解决连接问题,我想分离

const eventStream = new EventSource(evtSourceAdress);组成一个单独的可观测

如果连接成功,执行进一步的逻辑

使用的溶液

streamHandler(evtSourceAdress: string) {
return new Observable<MessageEvent<any>>(subscriber => {
const eventStream = new EventSource(evtSourceAdress);
eventStream.onmessage = e => subscriber.next(e);
eventStream.onerror = e => subscriber.error(e);
/* add if else clause if error is returned */

return () => {
if (eventStream.readyState === 1) {
/* TODO add reconnect  */
eventStream.close()
}
}

})
}

我修改了代码,将eventStream作为可观察

openEventSource(url: string): Observable<any> {
return new Observable<MessageEvent>(subscribe => {
const sse = new EventSource(url);
//  return sse.readyState
})
// return new Observable<MessageEvent>(subscriber => {
//   const sse = new EventSource(url);
}

this.eventStreamHandlerServ.openEventSource('http://localhost:3000/sse/event').subscribe(e => console.log(e))

它不会返回任何

添加返回语句return sse.readyState

给我Type 'number' is not assignable to type 'TeardownLogic'.

有没有一种方法可以只将事件作为可观察的?

深思熟虑的答案从(服务器发送的(EventSource 创建RxJS Observable

您可以使用retry重试连接。

openEventSource(url: string): Observable<any> {
return new Observable<any>((subscriber) => {
const eventSource = new EventSource(url);
eventSource.onmessage = (ev) => subscriber.next(ev.data);
eventSource.onerror = (ev) => subscriber.error(ev);
return () => eventSource.close();
});
}

为了引入重试逻辑,您只需使用上面提到的retry运算符:

this.eventSourceService
.openEventSource(url)
// retry the connection 5 times, with 500 milliseconds delay
.pipe(retry({ count: 5, delay: 500 }))
.subscribe({
next: (data) => {
console.log('data from event source', data);
},
error: (error) => {
console.log('event source has an error', error);
},
});

最新更新