如何在不使用其他状态变量的情况下跟踪发布/完成的请求?(Java/grpc)



我正在使用grpc-java项目中的streamobserver类来设置一些双向流。

运行程序时,我向服务器提出了一个不确定的请求,并且一旦我完成了所有请求,我才想在请求observer上拨打oncompleted((。

目前,为了解决这个问题,我正在使用一个可变的"机上"来跟踪已发出的请求,并且当响应返回时,我会减少"机上"。所以,这样的东西。

// issuing requests
while (haveRequests) {
    MessageRequest request = mkRequest();
    this.requestObserver.onNext(request);
    this.inFlight++; 
}

// response observer
StreamObserver<Message> responseObserver = new StreamObserver<Message> {
    @Override
    public void onNext(Message response) {
        if (--this.onFlight == 0) {
            this.requestObserver.onCompleted();
        }
        // work on message
    }
    // other methods
}

有点伪代码,但是此逻辑有效。但是,如果可能的话,我想摆脱"机上"变量。StreamObserver类中是否有任何允许这种功能的东西,而无需额外的变量跟踪状态?可以说明发出的请求数量以及完成何时完成的内容。

我尝试检查Intellij IDE调试器中的对象,但没有任何弹出的内容。

要回答您的直接问题,您只需从while循环调用oncomplete即可。所有消息传递给onNext。在引擎盖下,GRPC将发送所谓的"半近距离",表明它不会再发送任何消息,但愿意接收它们。具体:

// issuing requests
while (haveRequests) {
    MessageRequest request = mkRequest();
    this.requestObserver.onNext(request);
    this.inFlight++; 
}
requestObserver.onCompleted();

这可以确保发送所有响应,并按照您发送的顺序发送。在服务器端,当它看到相应的onCompleted回调时,它可以通过在其观察者上调用oncomplete来半将连接的一侧关闭。(服务器端有两个观察员,可从客户端接收信息,一个用于发送信息(。

回到客户端,您只需要等待服务器关闭一半即可知道所有消息均已接收并处理。请注意,如果有任何错误,您将获得onError回调。

如果您不知道您要在客户端提出多少请求,则可以考虑使用AtomicInteger,并在回复响应时致电decrementAndGet。如果返回值为0,您将知道所有请求已经完成。

最新更新