用Mono封装阻塞客户端并执行序列



我正在尝试使一个需要执行ssh命令的响应式应用程序。

目前,有一个SSH客户端(基于sshd mina)是阻塞的(也许有一种方法可以以非阻塞的方式使用它,但我不知道)。我的想法是在这个阻塞客户机周围创建一个包装器,这样我就可以将阻塞调用转换为Mono,如下面的代码所示。

public class SshReactiveClient extends AutoCloseable {
private final SshClient sshClient;
public SshReactiveClient(final SshClient sshClient) {
this.sshClient = sshClient;
}
public Mono<SshResponse> open(final Duration timeout) {
return Mono.fromCallable(() -> sshClient.open(timeout))
.subscribeOn(Schedulers.boundedElastic());
}
public Mono<SshResponse> execCommand(final SshCommand command, final Duration timeout) {
return Mono.fromCallable(() -> sshClient.execCommand(command, timeout))
.subscribeOn(Schedulers.boundedElastic());
}
@Override
public void close() throws Exception {
sshClient.close();
}
}

首先,这样进行到底好不好?什么会更好?

第二点是如何编写代码,以便我可以使用前一个命令的响应来执行下一个命令来执行ssh命令的序列?

你的理解是正确的。您需要包装阻塞或同步代码,并在单独的Scheduler上运行它。如果客户端支持异步接口就更好了。

要按顺序执行命令,你需要使用响应式API构建一个流。

execCommand(command1)
.flatMap(res -> 
execCommand(getCommand2(res))
)
.flatMap(res -> 
execCommand(getCommand3(res))
)

根据您的要求,还有许多其他的选择。例如,如果您需要command1 &命令2要执行命令3,你只需"shift"向下流动一层

execCommand(command1)
.flatMap(res1 -> 
execCommand(getCommand2(res1))
.flatMap(res2 -> 
execCommand(getCommand3(res1, res2))
)
)

作为一种替代方法,您可以将构建器模式应用于反应器流,以使用reactor mono在顺序流构建器模式中收集响应

您还可以并行执行command1和command2,并使用来自两者的响应

Mono.zip(command1, command2)
.flatMap(tuple -> 
execCommand(getCommand3(tuple.getT1(), tuple.getT2()))
)

最新更新