为什么gRPC客户端取消仅在客户端和服务器都在同一进程上运行时有效



我有一个异步服务器,它将数据流式传输到单个异步客户端。我希望能够取消客户端的流式传输,并让服务器停止流式传输。

目前,我在本地Windows 10机器上的两个独立进程上运行客户端和服务器。但我试过在一台单独的机器上运行客户端,它的行为是一样的。

我的服务器端端点配置如下:

const auto server_grpc_port = "50051";
const auto server_endpoint = std::string("0.0.0.0:") + server_grpc_port;
serverBuilder.AddListeningPort(server_endpoint, grpc::InsecureServerCredentials());

我的客户端端点配置如下:

const auto server_grpc_port = "50051";
const auto client_endpoint = std::string("localhost:") + server_grpc_port;
remoteStub = std::make_unique<MyRemoteApp::Stub>(grpc::CreateChannel(client_endpoint, grpc::InsecureChannelCredentials()));

在启动客户端和服务器之后,我启动一个异步服务器流。在某个时刻,我从客户端触发取消,这将导致客户端停止读取,服务器停止写入。我遵循这里的答案和这里的github问题中描述的方法:

服务器端

  1. 创建grpc::ServerContext实例
  2. 呼叫grpc::ServerContext::AsyncNotifyWhenDone(cancellation_tag)。一旦cancellation_tag将出现在完成队列上,我们就可以调用grpc::ServerContext::IsCancelled()来确定客户端是否取消了RPC
  3. 等待RPC流由客户端启动:服务器->RequestMyStreamingRPC(…令牌…(
  4. 每次token到达CompletionQueue时执行一系列写入操作
  5. 如果cancellation_tag到达CompletionQueue,则我们停止流传输

客户端

  1. 创建grpc::ClientContext实例
  2. 启动RPC-Stub::PrepareAsync<>
  3. 如果我们希望从服务器接收数据,可以多次调用reader->Read
  4. 在某个时刻,调用grpc::ClientContext::TryCancel()
  5. 我们调用reader->Finish,它返回一个CANCELLED状态
  6. 销毁CCD_ 15实例和读取器

但是,cancellation_tag从未到达服务器。只有当我销毁客户端的Stub实例时,我才能最终在服务器的CompletionQueue上接收到cancellation_tag。如果我让存根保持活动,服务器就会永远保持流式数据,就好像有客户端在读取它一样

经过进一步研究,当客户端和服务器在同一进程上运行时,或者当我实现一个简单的同步服务器时,似乎都不会出现问题。在这些情况下,取消工作如预期。

那么,可能出了什么问题呢?异步服务器处理取消的方式是否有问题?

经过进一步调查,我认为我发现了问题。这似乎是关于CompletionQueue行为的一些未记录的方面。

我在整个服务器程序中使用了一个线程。因此,完成处理程序是在调用AsyncNext的同一线程上调用的。像这样:

while(server_active)
{
status = _completionQueue->AsyncNext(&tag, &ok);
if (status == grpc::CompletionQueue::GOT_EVENT)
{
CallHandler(tag, ok); // does the logic, probably another write
}
}

每当一次写入完成时,它就会立即触发另一次写入。

看起来,当客户端触发取消时,相关的完成标记实际上已经插入到队列中,但排出循环从未到达它,因为它不断添加更多的写入完成。这就好像队列的行为是以后进先出的方式进行的。

当我修改循环以首先排出队列,然后调用处理程序时,我立即得到了预期的行为。

while(server_active)
{
std::vector<Completions> completions;
while(1) // drain the queue
{   
status = _completionQueue->AsyncNext(&tag, &ok);
if (status == grpc::CompletionQueue::GOT_EVENT)
{
completions.emplace_back(tag, ok);
}
else
{
break;
}
} 
for (auto completion : completions)
{
CallHandler(completion.tag, completion.ok); // does the logic
}
}

相关内容

  • 没有找到相关文章

最新更新