RxJS5 WebSocketSubject -如何过滤和完成消息



我正在寻找与RxJS 5建立WebSocket连接的正确方法的一些指导。我正在连接到一个使用JSON-RPC 2.0的WebSocket。我希望能够执行一个函数,它发送一个请求到WS,并从服务器返回一个相关响应的Observable。

我设置我的初始WebSocketSubject如下:

const ws = Rx.Observable.webSocket("<URL>")

从这个可观察对象中,我已经能够使用ws.next(myRequest)发送请求,并且我已经能够看到通过ws的可观察对象返回的响应。

我一直在努力创建将过滤ws响应到正确响应然后完成的函数。这些似乎完成了源主题,停止了所有未来的ws请求。

我的预期输出是这样的:

function makeRequest(msg) {
    // 1. send the message
    // 2. return an Observable of the response from the message, and complete
}

我试了如下:

function makeRequest(msg) {
    const id = msg.id;
    ws.next(msg);
    return ws
        .filter(f => f.id === id)
        .take(1);
}

但是,当我这样做时,只有第一个请求将工作。随后的请求不会工作,我相信是因为我正在完成take(1) ?

对于这种情况的适当架构有什么想法吗?

如果没有其他订阅者,则在取消订阅时关闭WebSocket似乎是一个错误或故意的设计决策。如果你感兴趣,这里是相关的来源。

本质上你需要保证总是有一个订阅者,否则WebSocket将被关闭。您可以通过两种方式完成此操作。

路由A是更语义化的方式,本质上你创建SubjectObservable部分的发布版本,你有更细粒度的控制。

const ws = Rx.Observable.webSocket("<URL>");
const ws$ = ws.publish();
//When ready to start receiving messages
const totem = ws$.connect();
function makeRequest(msg) {
    const { id } = msg;
    ws.next(msg);
    return ws$.first(f => f.id === id)
}
//When finished
totem.unsubscribe();

路由B是创建一个令牌订阅,它只持有套接字,但根据应用程序的实际生命周期,您可以很好地附加某种关闭事件,以确保它总是关闭。例如

const ws = Rx.Observable.webSocket("<URL>");
const totem = ws.subscribe();
//Later when closing:
totem.unsubscribe();

可以看到,这两种方法非常相似,因为它们都创建了订阅。B的主要缺点是您创建了一个空订阅,它将抽取所有事件,然后将它们丢弃。B的唯一优点是您可以使用相同的变量引用Subject进行发射和订阅,而A必须小心使用ws$进行订阅。

如果你真的这么想,你可以使用Subject创建函数来改进路由A:

const safeWS = Rx.Subject.create(ws, ws$);

以上将允许您使用相同的变量,但您仍然要负责关闭ws$和传递性的WebSocket,当您完成它。

相关内容

  • 没有找到相关文章

最新更新