我正在寻找与RxJS 5建立WebSocket连接的正确方法的一些指导。我正在连接到一个使用JSON-RPC 2.0的WebSocket。我希望能够执行一个函数,它发送一个请求到WS,并从服务器返回一个相关响应的Observable。
我设置我的初始WebSocketSubject如下:
const ws = Rx.Observable.webSocket("<URL>")
从这个可观察对象中,我已经能够使用ws.next(myRequest)
发送请求,并且我已经能够看到通过ws的可观察对象返回的响应。
我一直在努力创建将过滤ws响应到正确响应然后完成的函数。这些似乎完成了源主题,停止了所有未来的ws请求。
我的预期输出是这样的:
function makeRequest(msg) {
// 1. send the message
// 2. return an Observable of the response from the message, and complete
}
我试了如下:
function makeRequest(msg) {
const id = msg.id;
ws.next(msg);
return ws
.filter(f => f.id === id)
.take(1);
}
但是,当我这样做时,只有第一个请求将工作。随后的请求不会工作,我相信是因为我正在完成take(1)
?
对于这种情况的适当架构有什么想法吗?
如果没有其他订阅者,则在取消订阅时关闭WebSocket
似乎是一个错误或故意的设计决策。如果你感兴趣,这里是相关的来源。
本质上你需要保证总是有一个订阅者,否则WebSocket
将被关闭。您可以通过两种方式完成此操作。
路由A是更语义化的方式,本质上你创建Subject
的Observable
部分的发布版本,你有更细粒度的控制。
const ws = Rx.Observable.webSocket("<URL>");
const ws$ = ws.publish();
//When ready to start receiving messages
const totem = ws$.connect();
function makeRequest(msg) {
const { id } = msg;
ws.next(msg);
return ws$.first(f => f.id === id)
}
//When finished
totem.unsubscribe();
路由B是创建一个令牌订阅,它只持有套接字,但根据应用程序的实际生命周期,您可以很好地附加某种关闭事件,以确保它总是关闭。例如
const ws = Rx.Observable.webSocket("<URL>");
const totem = ws.subscribe();
//Later when closing:
totem.unsubscribe();
可以看到,这两种方法非常相似,因为它们都创建了订阅。B的主要缺点是您创建了一个空订阅,它将抽取所有事件,然后将它们丢弃。B的唯一优点是您可以使用相同的变量引用Subject
进行发射和订阅,而A必须小心使用ws$
进行订阅。
如果你真的这么想,你可以使用Subject
创建函数来改进路由A:
const safeWS = Rx.Subject.create(ws, ws$);
以上将允许您使用相同的变量,但您仍然要负责关闭ws$
和传递性的WebSocket
,当您完成它。