如何使观察者在身份验证后等待创建Websocket



我正在使用rxjs收听来自WebSocket的数据流。为了使用此PubSub插座,我需要首先在另一个插座上进行身份验证,该插座将返回PubSub插座URL。

我现在只使用一个订户工作。我需要添加另一个,但是我当前的方法存在问题,因为两个订户都会触发身份验证并创建第二个PubSub插座。

要清楚,整个应用程序只能有一个身份验证套接字和一个PubSub套接字。但是我需要订户"等待"在尝试使用PubSub插座之前进行身份验证。否则,PubSub插座将不确定(因为我们只知道在运行时是URL)。

这是我目前的尝试:

websocket.service.ts

    private connect(): Observable<IConnectionInfo> {
    if (!this.authObservable) {
      var credentials = {
        "username": "bob",
        "password": "slob"
      }
      this.authObservable = Observable.create((observer) => {
        const socket = new WebSocket(this.authurl);
        socket.onopen = () => {
          console.log("Auth socket opened");
          socket.send(JSON.stringify(credentials));
        }
        socket.onmessage = (event) => {
          var connection_info = <IConnectionInfo>JSON.parse(event.data);
          observer.next(connection_info);
        }
        return () => {
          socket.close(); //invoked on unsubscribe
          console.log("Auth socket closed");
        }
      })
    }
    return this.authObservable;
  }
public open_pubsub(events: string[]): Observable<IPubSubMessage> {
    return this.connect()
      .flatMap((connection_info: IConnectionInfo) => {
        var url = "ws://" + this.hostName + ":" + connection_info.port + "/" + connection_info.ps;
        var subscription = {
          "subscribe": events
        }
        var authenticate_request = {
          "authenticate": connection_info['token']
        }
        if (!this.psObservable) {
          this.psObservable = Observable.create((observer) => {
            const socket = new WebSocket(url);
            socket.onopen = () => {
              console.log("PS socket opened");
              socket.send(JSON.stringify(authenticate_request));
              socket.send(JSON.stringify(subscription));
            }
            socket.onmessage = (event) => {
              var psmsg = <IPubSubMessage>JSON.parse(event.data);
              observer.next(psmsg);
            }
            return () => {
              socket.close(); //invoked on unsubscribe
              console.log("PS socked closed");
            }
          })
        }
        return this.psObservable;
      }
      );
  }

和观察者:

getSystemState(): Observable<string> {
    return this._wsService.open_pubsub([MSG_SYSTEM_STATE_CHANGED])
        .map((response: IPubSubMessage): string => {
                console.log(response.payload);
                return "I wish this worked";
        })
        .catch(this.handleError);
}

任何帮助!

edit 基于似乎已删除的答案,但确实很有用,我修改了代码。问题的这个固定部分,但仍会导致多个插座)

找到了我缺少的东西 - share()操作员的魔力。

因此,对于两个可观察到的东西,我都这样做:

this.authObservable = Observable.create((observer) => {
       ...(etc)...
      }).share();

令人惊讶的是,每个插座只有1个。

最新更新