有人可以向我解释一下gRPC StreamObserver.onError的正确用法是什么吗?



我正在尝试正确处理gRPC错误(Java,Spring-boot应用程序)。

基本上,我还需要将错误详细信息从 gRPC 服务器传输到客户端,但我发现很难理解StreamObserver.onError();的正确用法

方法文档说:

"从流接收终止错误。只能调用一次 如果调用它必须是调用的最后一个方法。特别是如果 异常由 onError 的实现引发,没有进一步的调用 允许使用任何方法。

"不允许进一步通话">是什么意思?在我维护的应用程序中,他们调用其他 gRPC 方法,根据文档,它们会得到java.lang.IllegalStateException: call already closed这很好。

我想知道 - 我(开发人员)应该在收到错误后终止当前的 java 方法(usus gRPC 调用)吗?例如,抛出异常以停止执行。或者预计 gRPC 将终止执行。(类似于从 gRPC 引发异常)

基本上,我如何正确使用onError(),如果我调用它,我应该期待和处理什么? 我需要解释它的用法和效果。

涉及两个StreamObserver实例。一个用于入站方向,即实现并传递给 gRPC 库的StreamObserver实例。这是包含如何处理响应的逻辑的StreamObserver。另一个用于出站方向,即gRPC 库在调用 RPC方法时返回给你的StreamObserver实例。这是用于发送请求的StreamObserver。大多数时候,这两个StreamObserver是相互交互的(例如,在全双工流调用中,响应StreamObserver通常调用请求StreamObserveronNext()方法,这就是您实现乒乓行为的方式)。

"不允许进一步调用"意味着当调用入站StreamObserveronError()方法时,您不应再调用出站方向StreamObserveronNext()onComplete()和/或onError(),即使您对入站onError()的实现抛出异常也是如此。由于入站StreamObserver是异步调用的,因此它与包含StreamObserver实现的方法无关。

例如:


public class HelloWorld {
private final HelloWorldStub stub;
private StreamObserver<HelloRequest> requestObserver;
...
private void sendRequest(String message) {
requestObserver.onNext(HelloRequest.newBuilder.setMessage(message).build());
}
public void start() {
stub.helloWorld(new StreamObserver<HelloResponse> {
@Override
public void onNext(HelloResponse response) {
sendRequest("hello from client");
// Optionally you can call onCompleted() or onError() on 
// the requestObserver to terminate the call.
}
@Override
public void onCompleted() {
// You should not call any method on requestObserver.
}
@Override
public void onError(Throwable error) {
// You should not call any method on requestObserver.
}
});
}
}

它与start()方法无关。

文档还提到你不应该做这样的事情

try {
requestObserver.onCompleted();
} catch(RuntimeException e) {
requestObserver.onError();
}

它主要用于用户自己的StreamObserver实现。StreamObserver返回的 gRPC 从不投掷。

我提取了一个用于GRPC流的模板,该模板抽象了许多GRPC样板,这些样板也解决了onError的逻辑。 在DechunkingStreamObserver

我使用以下通用模式进行GRPC流,类似于

META DATA DATA DATA META DATA DATA DATA

我将使用它的一个例子是采用一种形式并将其转换为另一种形式。

message SavedFormMeta {
string id = 1;
}
message SavedFormChunk {
oneof type {
SavedFormMeta meta = 1;
bytes data = 2;
}
}
rpc saveFormDataStream(stream SavedFormChunk) returns (stream SavedFormChunk) {}

我使用了一个标志来跟踪inError状态,以防止进一步处理并捕获onNextonComplete上的异常,我重定向到onError将错误转发到服务器端。

下面的代码提取 GRPC 语义并采用执行处理的 lamda。


/**
* Dechunks a GRPC stream from the request and calls the consumer when a complete object is created.  This stops
* further processing once an error has occurred.
*
* @param <T> entity type
* @param <R> GRPC chunk message type
* @param <S> GRPC message type for response streams
*/
class DechunkingStreamObserver<T, R, S> implements StreamObserver<R> {
/**
* This function takes the current entity state and the chunk and returns a copy of the combined result.  Note the combiner may modify the existing data, but may cause unexpected behaviour.
*/
private final BiFunction<T, R, T> combiner;
/**
* A function that takes in the assembled object and the GRPC response observer.
*/
private final BiConsumer<T, StreamObserver<S>> consumer;
/**
* Predicate that returns true if it is a meta chunk indicating a start of a new object.
*/
private final Predicate<R> metaPredicate;
/**
* this function gets the meta chunk and supplies a new object.
*/
private final Function<R, T> objectSupplier;
/**
* GRPC response observer.
*/
private final StreamObserver<S> responseObserver;
/**
* Currently being processed entity.
*/
private T current = null;
/**
* In error state.  Starts {@code false}, but once it is set to {@code true} it stops processing {@link #onNext(Object)}.
*/
private boolean inError = false;
/**
* @param metaPredicate    predicate that returns true if it is a meta chunk indicating a start of a new object.
* @param objectSupplier   this function gets the meta chunk and supplies a new object
* @param combiner         this function takes the current entity state and the chunk and returns a copy of the combined result.  Note the combiner may modify the existing data, but may cause unexpected behaviour.
* @param consumer         a function that takes in the assembled object and the GRPC response observer.
* @param responseObserver GRPC response observer
*/
DechunkingStreamObserver(
final Predicate<R> metaPredicate,
final Function<R, T> objectSupplier,
final BiFunction<T, R, T> combiner,
final BiConsumer<T, StreamObserver<S>> consumer,
final StreamObserver<S> responseObserver) {
this.metaPredicate = metaPredicate;
this.objectSupplier = objectSupplier;
this.combiner = combiner;
this.consumer = consumer;
this.responseObserver = responseObserver;
}
@Override
public void onCompleted() {
if (inError) {
return;
}
try {
if (current != null) {
consumer.accept(current, responseObserver);
}
responseObserver.onCompleted();
} catch (final Exception e) {
onError(e);
}
}
@Override
public void onError(final Throwable throwable) {
responseObserver.onError(throwable);
inError = true;
}
@Override
public void onNext(final R chunk) {
if (inError) {
return;
}
try {
if (metaPredicate.test(chunk)) {
if (current != null) {
consumer.accept(current, responseObserver);
}
current = objectSupplier.apply(chunk);
} else {
current = combiner.apply(current, chunk);
}
} catch (final Exception e) {
onError(e);
}
}
}

我有 4 个喇嘛

  • Predicate<R> metaPredicate它接收一个块并返回该块是否是元。
  • Function<R, T> objectSupplier它接收元块并创建一个由模块使用的新对象。
  • BiFunction<T, R, T> combiner,它接收数据块和当前对象,并返回包含组合的新对象。
  • BiConsumer<T, StreamObserver<S>> consumer,这将使用已完成的对象。 在发送新对象作为响应的情况下,它还会传入流观察器。

您唯一想做的是在调用responseObserver.onError();后标记为return,如下所示。 因为发送错误后无事可做。

if(condition){
responseObserver.onError(StatusProto.toStatusException(status));
//this is the required part
return;
}else{
responseObserver.onComplete(DATA);
}

最新更新