如何使用RxJS WebSocketSubject重新连接Websocket服务器



我找到了RxJS WebSocketSubject自动重连的各种解决方案,但都不适合我。

在我的RxJS 6.5/Angular代码中有一个connectToWebsocket()函数,它在启动时执行一次,包含如下:

    const openObserver: NextObserver<Event> = {
        next: () => console.debug('[Websocket] Connected')
    };
    const closeObserver: NextObserver<CloseEvent> = {
        next: () => console.debug('[Websocket] Disconnected')
    };
    // Connect to WebSocket Server
    this.websocketConnection= new WebSocketSubject({
        url: "ws://localhost:4000",
        deserializer: e => e.data,
        openObserver,
        closeObserver
    });

创建主题后,我订阅它:

    this.websocketConnection.subscribe((msg) => {
        this.data = msg;
    },
    (err) => {
        console.log("Error on on connection", err);
    },
    () => {
        console.log("Connection closed");
    });

所以现在,如果在初始连接或连接时,连接将在任何情况下关闭。如何重新连接websocket服务器?我需要写一个自己的重连接策略还是已经有一个在RxJS?

提前感谢!

这是Angular Service对web sockets的支持。特点:

  • 5秒后重新连接错误
  • 授权
    import { Injectable } from "@angular/core";
    import { Subject, timer } from "rxjs";
    import { webSocket, WebSocketSubject } from "rxjs/webSocket";
    import { tap, retryWhen, delayWhen } from "rxjs/operators";
    
    import { ApiConfigService } from "../../api-config.service";
    import { AuthStorageService } from "../../auth/auth-storage.service";
    
    @Injectable({
        providedIn: 'root'
    })
    export class WebsocketService {
      private RECONNECT_INTERVAL: number = 5000;
    
      private socketsBaseUrl: string;
      private socketClientId: string
      private socket$: WebSocketSubject<any>;
      private messages$: Subject<MessageEvent>;
    
      constructor(apiConfigService: ApiConfigService, private authStorageService: AuthStorageService) {
        this.socketsBaseUrl = apiConfigService.socketsBaseUrl;
      }
    
      public connect(jobId: string, cfg: { reconnect: boolean } = { reconnect: false }): Subject<MessageEvent> {
        if (!this.socket$ || this.socket$.closed) {
          this.socket$ = this.createNewWebSocket(jobId);
          this.messages$ = <Subject<MessageEvent>>this.socket$.pipe(
            tap({
                error: error => console.log(error),
            }),
            retryWhen(errors =>
                errors.pipe(
                    tap(val => console.log('[WebSocket] trying to reconnect', val)),
                    delayWhen(_ => timer(this.RECONNECT_INTERVAL))
                )
            )
          );
        }
        return this.messages$;
      }
    
      private createNewWebSocket(jobId: string): WebSocketSubject<any> {
        const clientId: number = Date.now();
        this.socketClientId = `${clientId}-job${jobId}`
        const authHeader: string = encodeURIComponent(this.authStorageService.getAuthorizationHeaderValue());
        const socketsUrl: string = `${this.socketsBaseUrl}/${this.socketClientId}?authorization=${authHeader}`;
        return webSocket({
            url: socketsUrl,
            openObserver: {
                next: () => {
                  console.log('[WebSocket] connection established');
                }
              },
            closeObserver: {
                next: () => {
                  console.log('[WebSocket] connection closed');
                  this.socket$ = undefined;
                  this.connect(jobId, { reconnect: true });
                }
            },
        });
      }
    
      public close(): void {
          if (this.socket$ !== undefined) {
            this.socket$.complete();
            this.socket$ = undefined;
            console.log(`[WebSocket] connection closed`);
          }
      }
    }

this.connect(jobId, { reconnect: true }); - reconnect: true是您正在寻找的选项

最新更新