例如,您可以使用以下代码接收双向流式传入消息。
public class DataService extends DataServiceGrpc.DataServiceImplBase {
@Override
public StreamObserver<DataReq> send(StreamObserver<DataResp> responseObserver) {
return new StreamObserver<DataReq>() {
@Override
public void onNext(DataReq value) {
}
@Override
public void onError(Throwable t) {
}
@Override
public void onCompleted() {
}
};
}
}
并说我有一些其他类似的类来捕获即将到来的gRPC消息中的每一个。
如果我想在这些类接收到所有传入消息之前捕获并控制它们,那么实现目标的标准方法是什么?
StreamObserver
是存根的一部分,所以不是。但是您可以制作一个ServerInterceptor
并将其注册到serverBuilder.intercept()
。拦截器可以通过包装ServerCall.Listener.来查看每个传入消息
class MyInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT,RespT> call, Metadata headers, ServerCallHandler<ReqT,RespT> next)
return new SimpleForwardingServerCallListener<>(next.startCall(call, headers)) {
@Override public void onMessage(ReqT message) {
// your code here
super.onMessage(message); // call into the application
}
};
}
}
CCD_ 4最终将成为另一个拦截器或应用程序。生成的代码(例如,DataServiceImplBase
(实现ServerCallHandler
,并使其调用应用程序(例如,DataService.send()
(。因此,这个拦截器可以在应用程序运行之前运行。