假设我有一个简单的RSocket和Spring Boot Server。服务器将所有传入的客户端消息广播到所有连接的客户端(包括发送方(。客户端和服务器如下所示:
服务器:
public RSocketController() {
this.processor = DirectProcessor.<String>create().serialize();
this.sink = this.processor.sink();
}
@MessageMapping("channel")
Flux<String> channel(final Flux<String> messages) {
this.registerProducer(messages);
// breakpoint here
return processor
.doOnSubscribe(subscription -> logger.info("sub"))
.doOnNext(message -> logger.info("[Sent] " + message));
}
private Disposable registerProducer(Flux<String> flux) {
return flux
.doOnNext(message -> logger.info("[Received] " + message))
.map(String::toUpperCase)
// .delayElements(Duration.ofSeconds(1))
.subscribe(this.sink::next);
}
客户:
@ShellMethod("Connect to the server")
public void connect(String name) {
this.name = name;
this.rsocketRequester = rsocketRequesterBuilder
.rsocketStrategies(rsocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
@ShellMethod("Establish a channel")
public void channel() {
this.rsocketRequester
.route("channel")
.data(this.fluxProcessor.doOnNext(message -> logger.info("[Sent] {}", message)))
.retrieveFlux(String.class)
.subscribe(message -> logger.info("[Received] {}", message));
}
@ShellMethod("Send a lower case message")
public void send(String message) {
this.fluxSink.next(message.toLowerCase());
}
问题是:客户端发送的第一条消息由服务器处理,但不会再次到达发送方。所有后续消息的传递都没有任何问题。已连接的所有其他客户端将收到所有消息。
到目前为止我在调试时注意到的
- 当我在客户端中调用 channel(( 时,调用 retrieveFlux(( 和 subscribe((。但在服务器上,断点不会在相应的方法中触发。
- 仅当客户端使用 send(( 发送第一条消息时,才会在服务器上触发断点。
- 在服务器上使用 .delayElements(( 似乎可以"解决"问题。
我在这里做错了什么? 为什么它首先需要 send(( 来触发服务器断点?
提前感谢!
>DirectProcessor
没有缓冲区。如果没有订阅者,则会丢弃消息。
(引用其Javadoc:如果没有订阅者,上游项目将被丢弃(
我认为当RSocketController.registerProducer()
调用flux.[...].subscribe()
时,它会立即开始处理来自flux
的传入消息并将它们传递给处理器的接收器,但尚未订阅处理器。因此,消息将被丢弃。
我猜对处理器的订阅是由框架完成的,从RSocketController.channel(...)
方法返回后。 -- 我认为您可以在processor.doOnSubscribe(..)
方法中设置断点以查看它实际发生的位置。
因此,也许将registerProducer()
调用移动到processor.doOnSubscribe()
回调中可以解决您的问题,如下所示:
@MessageMapping("channel")
Flux<String> channel(final Flux<String> messages) {
return processor
.doOnSubscribe(subscription -> this.registerProducer(messages))
.doOnSubscribe(subscription -> logger.info("sub"))
.doOnNext(message -> logger.info("[Sent] " + message));
}
但我认为我个人更愿意用UnicastProcessor.create().onBackpressureBuffer().publish()
代替DirectProcessor
.因此,向多个订阅者进行的广播被移动到单独的操作中,以便在接收器和订阅者之间可以有一个缓冲区,并且可以以更好的方式处理延迟订阅者和背压。