我找到了RxJS WebSocketSubject自动重连的各种解决方案,但都不适合我。
在我的RxJS 6.5/Angular代码中有一个connectToWebsocket()
函数,它在启动时执行一次,包含如下:
const openObserver: NextObserver<Event> = {
next: () => console.debug('[Websocket] Connected')
};
const closeObserver: NextObserver<CloseEvent> = {
next: () => console.debug('[Websocket] Disconnected')
};
// Connect to WebSocket Server
this.websocketConnection= new WebSocketSubject({
url: "ws://localhost:4000",
deserializer: e => e.data,
openObserver,
closeObserver
});
创建主题后,我订阅它:
this.websocketConnection.subscribe((msg) => {
this.data = msg;
},
(err) => {
console.log("Error on on connection", err);
},
() => {
console.log("Connection closed");
});
所以现在,如果在初始连接或连接时,连接将在任何情况下关闭。如何重新连接websocket服务器?我需要写一个自己的重连接策略还是已经有一个在RxJS?
提前感谢!
这是Angular Service对web sockets的支持。特点:
- 5秒后重新连接错误 授权
import { Injectable } from "@angular/core";
import { Subject, timer } from "rxjs";
import { webSocket, WebSocketSubject } from "rxjs/webSocket";
import { tap, retryWhen, delayWhen } from "rxjs/operators";
import { ApiConfigService } from "../../api-config.service";
import { AuthStorageService } from "../../auth/auth-storage.service";
@Injectable({
providedIn: 'root'
})
export class WebsocketService {
private RECONNECT_INTERVAL: number = 5000;
private socketsBaseUrl: string;
private socketClientId: string
private socket$: WebSocketSubject<any>;
private messages$: Subject<MessageEvent>;
constructor(apiConfigService: ApiConfigService, private authStorageService: AuthStorageService) {
this.socketsBaseUrl = apiConfigService.socketsBaseUrl;
}
public connect(jobId: string, cfg: { reconnect: boolean } = { reconnect: false }): Subject<MessageEvent> {
if (!this.socket$ || this.socket$.closed) {
this.socket$ = this.createNewWebSocket(jobId);
this.messages$ = <Subject<MessageEvent>>this.socket$.pipe(
tap({
error: error => console.log(error),
}),
retryWhen(errors =>
errors.pipe(
tap(val => console.log('[WebSocket] trying to reconnect', val)),
delayWhen(_ => timer(this.RECONNECT_INTERVAL))
)
)
);
}
return this.messages$;
}
private createNewWebSocket(jobId: string): WebSocketSubject<any> {
const clientId: number = Date.now();
this.socketClientId = `${clientId}-job${jobId}`
const authHeader: string = encodeURIComponent(this.authStorageService.getAuthorizationHeaderValue());
const socketsUrl: string = `${this.socketsBaseUrl}/${this.socketClientId}?authorization=${authHeader}`;
return webSocket({
url: socketsUrl,
openObserver: {
next: () => {
console.log('[WebSocket] connection established');
}
},
closeObserver: {
next: () => {
console.log('[WebSocket] connection closed');
this.socket$ = undefined;
this.connect(jobId, { reconnect: true });
}
},
});
}
public close(): void {
if (this.socket$ !== undefined) {
this.socket$.complete();
this.socket$ = undefined;
console.log(`[WebSocket] connection closed`);
}
}
}
this.connect(jobId, { reconnect: true });
- reconnect: true是您正在寻找的选项