异步C++客户端中CompletionQueue的困惑



关于如何将CompletionQueue用于异步C++客户端,我有一些困惑。我的服务器是C#,所以我在这里的问题纯粹是关于客户端以异步方式向服务器发送请求。

作为参考,以下是我如何设置客户端代码以进行异步请求:

template<typename ResponseType, typename AsyncOpExecutor>
bool DoAsyncOp(const AsyncOpExecutor& op, const unsigned int deadlineMs, ResponseType& response)
{
grpc::ClientContext ctx;
grpc::CompletionQueue queue;
const std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<ResponseType>> asyncOpResponse = op(ctx, queue);
grpc::Status status;
int requestTag = 1;
asyncOpResponse->Finish(&response, &status, (void*)requestTag);
bool result = false;
bool gotEvent = false;
do
{
const std::chrono::time_point<std::chrono::system_clock> deadline = std::chrono::system_clock::now() + std::chrono::milliseconds(deadlineMs);
void* got_tag;
bool ok = false;
const grpc::CompletionQueue::NextStatus nextStatus = queue.AsyncNext(&got_tag, &ok, deadline);
switch (nextStatus)
{
case grpc::CompletionQueue::NextStatus::TIMEOUT:
continue;
case grpc::CompletionQueue::NextStatus::GOT_EVENT:
assert(got_tag == (void*)requestTag);
// ok is always true even if I close the server while request is in progress.
assert(ok);
result = status.ok();
gotEvent = true;
break;
// Given that I am creating a new CompletionQueue per request (not using a shared one), is this flag likely to occur?
case grpc::CompletionQueue::NextStatus::SHUTDOWN:
result = false;
gotEvent = false;
break;
default:
result = false;
gotEvent = false;
break;
}
} while (!gotEvent);
return result;
}

我的第一个困惑是关于设置CompletionQueue的最佳方式。这个答案似乎表明单个CompletionQueue可以跨请求使用。如果多个线程使用同一队列进行请求,这将如何表现。假设我将上面的代码更改为使用共享队列,而不是为每个请求创建一个新的队列。

  • 一个线程如何知道它收到的响应是针对它的,而不是针对另一个线程的?

  • 我是否需要为每个线程分配一个唯一的标记,并在每个线程上检查从队列接收的标记是否与我最初发送的标记匹配?

  • 如果线程A收到了一个针对线程B的标记,这是否意味着线程B稍后可以查询其标记,还是因为线程A首先看到了该标记而丢失了该标记?

  • 对于每个请求使用新队列而不是共享,是否真的存在重大问题?

我的第二个困惑是关于grpc::CompletionQueue::NextStatus::SHUTDOWN的结果。如果我为每个请求使用一个新的队列,并且没有在队列上显式调用shutdown,那么这种结果可能会发生吗?如果是,是什么触发它?我执行的一个测试是在请求进行时关闭服务器,但我没有得到关闭结果,而是得到了grpc::CompletionQueue::NextStatus::GOT_EVENT结果,状态设置为UNAVAILABLE

我最后的困惑是关于ok标志。我已经读过这个答案,但是它仍然不是很清楚。给定上面发布的用例和代码,如果我从队列中得到的结果是grpc::CompletionQueue::NextStatus::GOT_EVENT,那么ok标志会是false吗?如果是,是什么导致它是false?同样,这纯粹是关于客户端的,而不是CompletionQueue在服务器上的处理方式。

  1. 使用从完成队列返回的标记来知道是哪个完成队列操作启动了此操作。您必须确保飞行中没有两个CQ操作同时使用同一标签
  2. 飞行中的每个CQ操作同时需要一个不同的标签。无论这些操作是从同一个线程还是从不同的线程启动,这都适用
  3. 每个标记只从Next函数中出现一次。如果在同一个CompletionQueue上有多个线程调用Next,那么它们都应该能够处理(或传递(注册到该CQ的任何标记
  4. 您可以为每个RPC使用一个新队列,就像同步API实际上在内部所做的那样。在这种情况下,它的效率较低,并且基本上移交给了sync API,因为您不可能同时拥有许多未完成的操作

您需要在某个时刻调用Shutdown,然后排出队列。这是API的标准。否则,特别是对于服务器CQ,很可能会发生泄漏。所谓drain,我的意思是调用Next,直到它返回false(或者AsyncNext,直到返回SHUTDOWN(。

对于流式调用、服务器端调用等,GOT_EVENT的ok可以为false。如果您只查看客户端一元调用(如您的示例(,则不会为false。假ok本质上意味着RPC的这一侧被破坏。这方面的文档在CQ.的头文件中

最新更新