我已经在Android应用程序中设置了一个双向流结构,我目前正在使用该机制来发送大文件块。 我遇到的问题是我的应用程序将收到文件的请求消息,然后我将使用可能价值数百 MB 的 GRPC 消息进行响应,这经常导致 OOM。 伪代码:
public class Myclass implements StreamObserver<CameraRequest>, Closeable {
...
public void onNext(Request req) {
for (Chunk chunk : getChunks(req))
this.requestObserver.onNext(builder.setChunk(chunk).build());
}
...
}
有没有一些好的方法可以根据实际放在线路上的内容(以及相应的内存可用(来限制对onNext的未完成调用的数量? IE 只允许对 onNext 进行 10 次调用,然后后续调用阻止,直到底层协议栈成功发送前面调用的数据? 我可以在我的有线协议TCP风格中实现一个完整的e2e确认窗口,但希望有一个更简单/内置的技术其他人正在使用。
谢谢!
将requestObserver
投射到ClientCallStreamObserver
.然后,您可以致电clientCallStreamObserver.isReady()
检查是否应停止发送。
然后,您将需要通知 RPC 何时准备好接收更多消息,以恢复发送。为此,请在beforeStart()
内实现ClientResponseObserver
并调用clientCallStreamObserver.setOnReadyHandler(Runnable)
。
把这些放在一起,你会得到这样的结果:
public class MyClass implements
ClientResponseObserver<CameraRequest,CameraResponse> {
private ClientCallStreamObserver<CameraRequest> requestObserver;
private Iterable<Chunk> chunks;
public void beforeStart(ClientCallStreamObserver<CameraRequest> requestObserver) {
this.requestObserver = requestObserver;
requestObserver.setOnReadyHandler(MyClass::drain);
}
public void onNext(CameraRequest req) {
// I don't know if this assert valid for your protocol
assert chunks == null || !chunks.hasNext();
chunks = getChunks(req);
drain();
}
public void drain() {
while (requestObserver.isReady() && chunks.hasNext()) {
Chunk chunk = chunks.next();
requestObserver.onNext(builder.setChunk(chunk).build());
}
}
...
}
<</div>
div class="one_answers"> 您可以在此处查看流控制示例。