使用grpc作为webflux的客户端



我们团队的项目是一个接受请求并将其转发给处理业务机器的产品。我们使用Springboot的webflux作为框架,并使用gRPC作为客户端向商业机器发送请求。

其过程如下:默认用户& lt;环境;Webflux & lt;→gRPC & lt;→实际处理业务的机器

我们被webflux和gRPC的异步非阻塞特性所吸引。gRPC使用流。

我们编写的控制器和服务大致如下:
@PostMapping(”xxxx“)
pulbic Mono<String> mac() {
final COmpletableFuture<String> future = new CompletableFuture<>();
final StreamObserver<MacMessage> request = gRPCService.getStub.mac(new StreamObserver<>() {
String mac;
@Override
public void onNext(MacMessage value) {
mac = Base64.encode(value.getMac.toByteArray());
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
future.complete(mac);
}
});
request.onNext(MacMessage.newBuilder().setxxxx....build());
request.onCompleted();
return Mono.fromFuture(future);
}

另外,最近在尝试Mono API时,我发现了另一种书写方式,不需要使用CompletableFuture,效果也差不多:

@PostMapping(”xxxx“)
pulbic Mono<String> mac() {
return Mono.create(monoSink -> {
final StreamObserver<MacMessage> request = gRPCService.getStub.mac(new StreamObserver<>() {
String mac;
@Override
public void onNext(MacMessage value) {
mac = Base64.encode(value.getMac.toByteArray());
}
@Override
public void onError(Throwable t) {
monoSink.error(t);
}
@Override
public void onCompleted() {
monoSink.success(mac);
}
});
request.onNext(MacMessage.newBuilder().setxxxx....build());
request.onCompleted();
});
}

我想知道以上的写作方法是否满足异步非阻塞的要求,如果不满足,应该如何使用?

产生这种怀疑的原因是(以上面的代码为例)我们发现,如果我们只使用打印来消耗gRPC StreamObserver#onNext中的响应值,性能远高于上述两种写入方法,并且上述两种写入方法的性能相似。这让我想知道,打印消费响应值和返回响应值的性能有这么大的区别吗?

这里有几种方法可以创建从最简单到最复杂的Mono<GrpcResponse>

首先你需要一个GRPC存根供应商。下面是一个简单的例子,它提供了所有3种存根类型。

@Component
public class GrpcStubProvider implements DisposableBean {
private ConcurrentMap<String, ManagedChannel> channels = new ConcurrentHashMap<>();
public ProfileStub forProfile(String serverId, int port) {
return ProfileGrpc.newStub(obtainChannelFor(serverId, port));
}
public ProfileBlockingStub forProfileBlocking(String serverId, int port) {
return ProfileGrpc.newBlockingStub(obtainChannelFor(serverId, port));
}
public ProfileFutureStub forProfileFutureStub(String serverId, int port) {
return ProfileGrpc.newFutureStub(obtainChannelFor(serverId, port));
}
@Override
public void destroy() {
channels.values().forEach( channel -> channel.shutdown() }
}
private ManagedChannel obtainChannelFor(String serverId, int port) {
return channels.computeIfAbsent(serverId, keyServerId ->
ManagedChannelBuilder.forAddress(keyServerId, port).build()
);
}
}
<标题>阻塞存根h1> 旦你有了提供者,第一个实现可能不是最优雅的,也不是最有效的,但它很容易理解。
Mono.fromSupplier(() -> grpcStubProvider
.forProfileBlocking("myserver", 1234)
.authenticate(req));

这简单地调用带有请求的存根来获得结果。这不能处理流,但是简单的一元调用可以。

Normal stub (Mono)

这利用了单声道。使用流处理程序创建。在这种情况下,onCompleted是无操作的,因为Mono只需要一个值。

Mono.create(sink -> {
final var stub = grpcStubProvider.forProfile("nyserver", 1234); 
stub.authenticate(request, new StreamObserver<ProfileOuterClass.ProfileResponse>() {
@Override
public void onNext(ProfileOuterClass.ProfileResponse value) {
sink.success(value);
}
@Override
public void onError(Throwable t) {
sink.error(t);
}
@Override
public void onCompleted() {
// no-op
}
});
});

正常存根(Flux)

最后一种方法的优点是你可以将其转换为Flux,当你获得流数据时,它对列表更有效。

Flux.create(sink -> {
var stub = grpcStubProvider.forProfile("nyserver", 1234);
stub.authenticate(request, new StreamObserver<ProfileOuterClass.ProfileResponse>() {
@Override
public void onNext(ProfileOuterClass.ProfileResponse value) {
sink.next(value);
}
Override
public void onError(Throwable t) {
sink.error(t);
}
@Override
public void onCompleted() {
sink.complete();
}
});
});

我的建议是不要使用Normal Stub,除非你在做流。复杂性并没有增加多少。

最新更新