我正在使用grpc-java项目中的streamobserver类来设置一些双向流。
运行程序时,我向服务器提出了一个不确定的请求,并且一旦我完成了所有请求,我才想在请求observer上拨打oncompleted((。
。目前,为了解决这个问题,我正在使用一个可变的"机上"来跟踪已发出的请求,并且当响应返回时,我会减少"机上"。所以,这样的东西。
// issuing requests
while (haveRequests) {
MessageRequest request = mkRequest();
this.requestObserver.onNext(request);
this.inFlight++;
}
// response observer
StreamObserver<Message> responseObserver = new StreamObserver<Message> {
@Override
public void onNext(Message response) {
if (--this.onFlight == 0) {
this.requestObserver.onCompleted();
}
// work on message
}
// other methods
}
有点伪代码,但是此逻辑有效。但是,如果可能的话,我想摆脱"机上"变量。StreamObserver类中是否有任何允许这种功能的东西,而无需额外的变量跟踪状态?可以说明发出的请求数量以及完成何时完成的内容。
我尝试检查Intellij IDE调试器中的对象,但没有任何弹出的内容。
要回答您的直接问题,您只需从while循环调用oncomplete即可。所有消息传递给onNext
。在引擎盖下,GRPC将发送所谓的"半近距离",表明它不会再发送任何消息,但愿意接收它们。具体:
// issuing requests
while (haveRequests) {
MessageRequest request = mkRequest();
this.requestObserver.onNext(request);
this.inFlight++;
}
requestObserver.onCompleted();
这可以确保发送所有响应,并按照您发送的顺序发送。在服务器端,当它看到相应的onCompleted
回调时,它可以通过在其观察者上调用oncomplete来半将连接的一侧关闭。(服务器端有两个观察员,可从客户端接收信息,一个用于发送信息(。
回到客户端,您只需要等待服务器关闭一半即可知道所有消息均已接收并处理。请注意,如果有任何错误,您将获得onError
回调。
如果您不知道您要在客户端提出多少请求,则可以考虑使用AtomicInteger
,并在回复响应时致电decrementAndGet
。如果返回值为0
,您将知道所有请求已经完成。