带有 Spring 引导的 RSocket 通道 - 客户端错过了自己的第一条消息



假设我有一个简单的RSocket和Spring Boot Server。服务器将所有传入的客户端消息广播到所有连接的客户端(包括发送方(。客户端和服务器如下所示:

服务器:

public RSocketController() {
this.processor = DirectProcessor.<String>create().serialize();
this.sink = this.processor.sink();
}
@MessageMapping("channel")
Flux<String> channel(final Flux<String> messages) {
this.registerProducer(messages);
// breakpoint here
return processor
.doOnSubscribe(subscription -> logger.info("sub"))
.doOnNext(message -> logger.info("[Sent] " + message));
}
private Disposable registerProducer(Flux<String> flux) {
return flux
.doOnNext(message -> logger.info("[Received] " + message))
.map(String::toUpperCase)
// .delayElements(Duration.ofSeconds(1))
.subscribe(this.sink::next);
}

客户:

@ShellMethod("Connect to the server")
public void connect(String name) {
this.name = name;
this.rsocketRequester = rsocketRequesterBuilder
.rsocketStrategies(rsocketStrategies)
.connectTcp("localhost", 7000)
.block();
}
@ShellMethod("Establish a channel")
public void channel() {
this.rsocketRequester
.route("channel")
.data(this.fluxProcessor.doOnNext(message -> logger.info("[Sent] {}", message)))
.retrieveFlux(String.class)
.subscribe(message -> logger.info("[Received] {}", message));
}
@ShellMethod("Send a lower case message")
public void send(String message) {
this.fluxSink.next(message.toLowerCase());
}

问题是:客户端发送的第一条消息由服务器处理,但不会再次到达发送方。所有后续消息的传递都没有任何问题。已连接的所有其他客户端将收到所有消息。

到目前为止我在调试时注意到的

  • 当我在客户端中调用 channel(( 时,调用 retrieveFlux(( 和 subscribe((。但在服务器上,断点不会在相应的方法中触发。
  • 仅当客户端使用 send(( 发送第一条消息时,才会在服务器上触发断点。
  • 在服务器上使用 .delayElements(( 似乎可以"解决"问题。

我在这里做错了什么? 为什么它首先需要 send(( 来触发服务器断点?

提前感谢!

>DirectProcessor没有缓冲区。如果没有订阅者,则会丢弃消息。

(引用其Javadoc:如果没有订阅者,上游项目将被丢弃(

我认为当RSocketController.registerProducer()调用flux.[...].subscribe()时,它会立即开始处理来自flux的传入消息并将它们传递给处理器的接收器,但尚未订阅处理器。因此,消息将被丢弃。

我猜对处理器的订阅是由框架完成的,从RSocketController.channel(...)方法返回后。 -- 我认为您可以在processor.doOnSubscribe(..)方法中设置断点以查看它实际发生的位置。

因此,也许将registerProducer()调用移动到processor.doOnSubscribe()回调中可以解决您的问题,如下所示:

@MessageMapping("channel")
Flux<String> channel(final Flux<String> messages) {
return processor
.doOnSubscribe(subscription -> this.registerProducer(messages))
.doOnSubscribe(subscription -> logger.info("sub"))
.doOnNext(message -> logger.info("[Sent] " + message));
}

但我认为我个人更愿意用UnicastProcessor.create().onBackpressureBuffer().publish()代替DirectProcessor.因此,向多个订阅者进行的广播被移动到单独的操作中,以便在接收器和订阅者之间可以有一个缓冲区,并且可以以更好的方式处理延迟订阅者和背压。

相关内容

最新更新