将 Scala 未来转变为反应堆通量



我正在调用一个返回Future of Scala的 API,我想在没有阻塞的情况下转换为Reactor Flux,并等待未来的响应。我正在尝试使用EmitterProcessor,当未来成功时它工作正常,但不幸的是,当未来超时时,它不会。

这里的代码

private Flux<D> transformFutureToFlux(Future<T> future) {
EmitterProcessor<D> emitterProcessor = EmitterProcessor.create();
future.onComplete(getOnCompleteFunction(emitterProcessor), defaultExecutionContext());
return Flux.from(emitterProcessor);
}
private <D> OnComplete getOnCompleteFunction(EmitterProcessor<D> emitterProcessor) {
return new OnComplete() {
@Override
public void onComplete(Throwable t, Object result) {
if (t != null) {
processError(t);
} else {
processSucceed(result);
}
}
private void processSucceed(Object result) {
if (result instanceof ConnectorErrorVO) {
publishConnectorErrorException((ConnectorErrorVO) result);
} else {
publishGenericResponse(result);
}
}
private void publishGenericResponse(Object result) {
if (result instanceof List<?>) {
Flux.fromIterable((List<D>) result).subscribe(emitterProcessor);
} else {
Flux.just((D) result).subscribe(emitterProcessor);
}
}
private void publishConnectorErrorException(ConnectorErrorVO result) {
ConnectorErrorVO connectorErrorVO = result;
Flux<D> error = Flux.error(new ConnectorErrorException( String.valueOf(connectorErrorVO.getCode()), connectorErrorVO.getDescription(), connectorErrorVO.getError()));
error.subscribe(emitterProcessor);
}
private void processError(Throwable t) {
ConnectorManagerExecutor.logger.error(null, "Error and recovery from connector manager transaction", t);
if (t instanceof AskTimeoutException) {
Flux.<D>error(new ConnectorErrorException("407", "connector timeout", ConnectorError.TIMEOUT)).subscribe(emitterProcessor);
} else {
Flux.<D>error(new ConnectorErrorException("500", t.getMessage(), ConnectorError.GENERIC)).subscribe(emitterProcessor);
}
}
};

我想做的是正确的?,有更好的方法吗?

问候

如果你返回一个Future,这意味着只有 1 个返回,而不是返回值的系列/流。 如果需要,您可以转换为Flux,但为什么不使用Mono呢?

另外,如果您使用的是 scala,请尝试使用reactor-scala-extensions 的SMono.fromFuture

查看 SMonoTest 以获取示例。

import scala.concurrent.ExecutionContext.Implicits.global
StepVerifier.create(SMono.fromFuture(Future[Long] {
randomValue
}))
.expectNext(randomValue)
.verifyComplete()

你试过Flux.create()吗?您注册一个调用提供的Sink方法的未来回调,而不是使用处理器。例如。对于List情况,您将循环访问列表并通过sink.next(T)方法发出每个值。

最新更新