同步和异步gRPC之间的区别



我正在开发一个基于gRPC的服务,它需要高吞吐量。但目前我的程序在使用C++同步gRPC时吞吐量较低。

我已经阅读了gRPC文档,但没有找到关于同步/异步API之间差异的明确解释。除了async可以控制完成队列,同时它对同步API是透明的。

我想知道同步gRPC是否向TCP层发送消息;ack";,那么下一条消息会被阻止吗?同时异步API会异步发送它们,而不会等待后面的消息?

TLDR:是的,异步API将异步发送消息,而不会等待后面的消息,而同步API将在发送/接收一条消息时阻塞整个线程

gRPC使用CompletionQueue进行异步操作。你可以在这里找到官方教程:https://grpc.io/docs/languages/cpp/async/

CompletionQueue是一个事件队列"事件";这里可以是请求数据接收的完成或警报(定时器)的到期等(基本上,任何异步操作的完成。)

以官方的gRPC异步API示例为例,重点介绍CallData类和HandleRpcs():

void HandleRpcs() {
// Spawn a new CallData instance to serve new clients.
new CallData(&service_, cq_.get());
void* tag;  // uniquely identifies a request.
bool ok;
while (true) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
static_cast<CallData*>(tag)->Proceed();
}
}

HandleRpcs()是服务器的主循环。它是一个无限循环,通过使用cq->Next()从完成队列中连续获取下一个事件,并调用它的Proceed()方法(我们的自定义方法,用于处理不同状态的客户端请求)。

CallData类(其实例表示客户端请求的完整处理周期):

class CallData {
public:
// Take in the "service" instance (in this case representing an asynchronous
// server) and the completion queue "cq" used for asynchronous communication
// with the gRPC runtime.
CallData(Greeter::AsyncService* service, ServerCompletionQueue* cq)
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE) {
// Invoke the serving logic right away.
Proceed();
}
void Proceed() {
if (status_ == CREATE) {
// Make this instance progress to the PROCESS state.
status_ = PROCESS;
// As part of the initial CREATE state, we *request* that the system
// start processing SayHello requests. In this request, "this" acts are
// the tag uniquely identifying the request (so that different CallData
// instances can serve different requests concurrently), in this case
// the memory address of this CallData instance.
service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_,
this);
} else if (status_ == PROCESS) {
// Spawn a new CallData instance to serve new clients while we process
// the one for this CallData. The instance will deallocate itself as
// part of its FINISH state.
new CallData(service_, cq_);
// The actual processing.
std::string prefix("Hello ");
reply_.set_message(prefix + request_.name());
// And we are done! Let the gRPC runtime know we've finished, using the
// memory address of this instance as the uniquely identifying tag for
// the event.
status_ = FINISH;
responder_.Finish(reply_, Status::OK, this);
} else {
GPR_ASSERT(status_ == FINISH);
// Once in the FINISH state, deallocate ourselves (CallData).
delete this;
}
}
private:
// The means of communication with the gRPC runtime for an asynchronous
// server.
Greeter::AsyncService* service_;
// The producer-consumer queue where for asynchronous server notifications.
ServerCompletionQueue* cq_;
// Context for the rpc, allowing to tweak aspects of it such as the use
// of compression, authentication, as well as to send metadata back to the
// client.
ServerContext ctx_;
// What we get from the client.
HelloRequest request_;
// What we send back to the client.
HelloReply reply_;
// The means to get back to the client.
ServerAsyncResponseWriter<HelloReply> responder_;
// Let's implement a tiny state machine with the following states.
enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;  // The current serving state.
};

正如我们所看到的,CallData有三种状态:CREATE、PROCESS和FINISH。

请求例程如下所示:

  1. 启动时,为未来的传入客户端预先分配一个调用数据
  2. 在构造该CallData对象的过程中,service_->RequestSayHello(&ctx_, &request_, &responder_, cq_, cq_, this)被调用,这告诉gRPC准备接收恰好一个SayHello请求
    此时,我们不知道请求将从哪里来,也不知道它何时来,我们只是告诉gRPC,当请求真正到达时,我们已经准备好处理它,并让gRPC在它发生时通知我们
    RequestSayHello的参数告诉gRPC在收到请求后将请求的上下文、请求主体和响应方放在哪里,以及通知使用哪个完成队列以及通知事件应附加哪些标记(在这种情况下,this用作标记)
  3. CCD_ 11在CCD_。正在等待事件发生

一段时间后

  1. 客户端向服务器发出SayHello请求,gRPC开始接收并解码该请求。(IO操作)

一段时间后

  1. gRPC已完成接收请求。它将请求体放入CallData对象的request_字段(通过前面提供的指针),然后创建一个事件(使用the pointer to the CallData object作为标记,正如前面RequestSayHello的最后一个参数所要求的那样)。gRPC然后将该事件放入完成队列cq_
  2. HandleRpcs()中的循环接收到事件(以前阻止的对cq->Next()的调用现在返回),调用CallData::Proceed()来处理请求
  3. CallData的status_PROCESS,因此它执行以下操作:
    6.1。创建一个新的CallData对象,以便可以处理此对象之后的新客户端请求
    6.2。生成请求的答复,告诉gRPC我们已完成处理,请将答复发送回客户端
    6.3gRPC开始传输回复。(IO操作)
    6.4HandleRpcs()中的循环进入下一次迭代,并再次阻塞cq->Next(),等待新事件发生

一段时间后

  1. gRPC已经完成了回复的传输,并告诉我们,通过再次将事件放入完成队列,并将指向CallData的指针作为标记
  2. cq->Next()接收事件并返回CallData::Proceed()解除分配CallData对象(使用delete this;)。HandleRpcs()cq->Next()上再次循环和阻塞,等待新的事件

看起来这个过程与synchonous API基本相同,只是对完成队列有额外的访问权限。然而,通过这样做,在每个some time later....(通常等待IO操作完成或等待请求发生),cq->Next()实际上不仅可以接收该请求的操作完成事件,还可以接收其他请求的操作结束事件。

因此,如果在第一个请求等待应答数据传输完成时,有一个新请求进入,cq->Next()将获得新请求发出的事件,并立即同时开始处理新请求,而不是等待第一个请求完成传输

另一方面,同步API将始终等待一个请求的完全完成(从开始接收到完成回复),然后再开始接收另一个请求。这意味着在接收请求正文数据和发回回复数据(IO操作)时,CPU利用率接近0%。本来可以用来处理其他请求的宝贵CPU时间浪费在了等待上。

这真的很糟糕,因为如果一个互联网连接不好的客户端(往返100毫秒)向服务器发送了一个请求,我们将不得不为来自该客户端的每个请求花费至少200毫秒的时间来积极等待TCP传输完成。这将使我们的服务器性能降低到每秒只有5个请求。

而如果我们使用异步API,我们就不会主动等待任何事情。我们告诉gRPC:";请将这些数据发送给客户,但我们不会等待您在这里完成。相反,完成后只需在完成队列中放入一个小字母,我们稍后会进行检查"并继续处理其他请求。

相关信息

您可以看到一个简单的服务器是如何为同步API和异步API 编写的

最佳性能实践

gRPC C++性能节点建议的最佳性能实践是生成与CPU核心数相等的线程数量,并为每个线程使用一个CompletionQueue。

最新更新