将一个 websocket 客户端的输出作为输入发送到另一个客户端



在SO上快速搜索未能找到我类似的问题,所以我们来了

我基本上想要 RSocket 的请求通道语法与 Webflux,这样我就可以在 WebSocketClient.execute(( 方法之外处理收到的 Flux 并编写这样的东西(会话仅在订阅返回的通量时打开,正确的错误传播,当入站和出站通量都完成时自动完成和关闭 WS 会话 - 要么由服务器端完成,要么由消费者取消(

服务/f 将其收到的字符串消息包装在 'f(...(' 中: 'str' -> 'f(str('

服务/g 对"g(...("执行相同的操作,并且以下测试通过:

private final DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
private WebSocketMessage serializeString(final String text) {
return new WebSocketMessage(Type.TEXT, dataBufferFactory.wrap(text.getBytes(StandardCharsets.UTF_8)));
}
@Test
void test() {
var requests = 5;
var input = Flux.range(0, requests).map(String::valueOf);
var wsClient = new ReactorNettyWebSocketClient(
HttpClient.from(TcpClient.create(ConnectionProvider.newConnection())));
var f = requestChannel(wsClient, fUri, input.map(this::serializeString))
.map(WebSocketMessage::getPayloadAsText);
var g = requestChannel(wsClient, gUri, f.map(this::serializeString))
.map(WebSocketMessage::getPayloadAsText);
var responses = g.take(requests);
var expectedResponses = Stream.range(0, requests)
.map(i -> "g(f(" + i + "))")
.toJavaArray(String[]::new);
StepVerifier.create(responses)
.expectSubscription()
.expectNext(expectedResponses)
.verifyComplete();
}

这似乎对我有用...迄今

public static Flux<WebSocketMessage> requestChannel(
WebSocketClient wsClient, URI uri, Flux<WebSocketMessage> outbound) {
CompletableFuture<Flux<WebSocketMessage>> recvFuture = new CompletableFuture<>();
CompletableFuture<Integer> consumerDoneCallback = new CompletableFuture<>();
var executeMono = wsClient.execute(uri,
wss -> {
recvFuture.complete(wss.receive().log("requestChannel.receive " + uri, Level.FINE));
return wss.send(outbound)
.and(Mono.fromFuture(consumerDoneCallback));
}).log("requestChannel.execute " + uri, Level.FINE);
return Mono.fromFuture(recvFuture)
.flatMapMany(recv -> recv.doOnComplete(() -> consumerDoneCallback.complete(1)))
.mergeWith(executeMono.cast(WebSocketMessage.class));
}

如果这个解决方案有任何缺陷,我还没有偶然发现,我很感兴趣

相关内容

最新更新