我有一个异步服务器,它将数据流式传输到单个异步客户端。我希望能够取消客户端的流式传输,并让服务器停止流式传输。
目前,我在本地Windows 10机器上的两个独立进程上运行客户端和服务器。但我试过在一台单独的机器上运行客户端,它的行为是一样的。
我的服务器端端点配置如下:
const auto server_grpc_port = "50051";
const auto server_endpoint = std::string("0.0.0.0:") + server_grpc_port;
serverBuilder.AddListeningPort(server_endpoint, grpc::InsecureServerCredentials());
我的客户端端点配置如下:
const auto server_grpc_port = "50051";
const auto client_endpoint = std::string("localhost:") + server_grpc_port;
remoteStub = std::make_unique<MyRemoteApp::Stub>(grpc::CreateChannel(client_endpoint, grpc::InsecureChannelCredentials()));
在启动客户端和服务器之后,我启动一个异步服务器流。在某个时刻,我从客户端触发取消,这将导致客户端停止读取,服务器停止写入。我遵循这里的答案和这里的github问题中描述的方法:
服务器端
- 创建
grpc::ServerContext
实例 - 呼叫
grpc::ServerContext::AsyncNotifyWhenDone(cancellation_tag)
。一旦cancellation_tag
将出现在完成队列上,我们就可以调用grpc::ServerContext::IsCancelled()
来确定客户端是否取消了RPC - 等待RPC流由客户端启动:服务器->RequestMyStreamingRPC(…令牌…(
- 每次
token
到达CompletionQueue
时执行一系列写入操作 - 如果
cancellation_tag
到达CompletionQueue
,则我们停止流传输
客户端
- 创建
grpc::ClientContext
实例 - 启动RPC-
Stub::PrepareAsync<>
- 如果我们希望从服务器接收数据,可以多次调用
reader->Read
- 在某个时刻,调用
grpc::ClientContext::TryCancel()
- 我们调用
reader->Finish
,它返回一个CANCELLED
状态 - 销毁CCD_ 15实例和读取器
但是,cancellation_tag
从未到达服务器。只有当我销毁客户端的Stub实例时,我才能最终在服务器的CompletionQueue上接收到cancellation_tag
。如果我让存根保持活动,服务器就会永远保持流式数据,就好像有客户端在读取它一样
经过进一步研究,当客户端和服务器在同一进程上运行时,或者当我实现一个简单的同步服务器时,似乎都不会出现问题。在这些情况下,取消工作如预期。
那么,可能出了什么问题呢?异步服务器处理取消的方式是否有问题?
经过进一步调查,我认为我发现了问题。这似乎是关于CompletionQueue行为的一些未记录的方面。
我在整个服务器程序中使用了一个线程。因此,完成处理程序是在调用AsyncNext的同一线程上调用的。像这样:
while(server_active)
{
status = _completionQueue->AsyncNext(&tag, &ok);
if (status == grpc::CompletionQueue::GOT_EVENT)
{
CallHandler(tag, ok); // does the logic, probably another write
}
}
每当一次写入完成时,它就会立即触发另一次写入。
看起来,当客户端触发取消时,相关的完成标记实际上已经插入到队列中,但排出循环从未到达它,因为它不断添加更多的写入完成。这就好像队列的行为是以后进先出的方式进行的。
当我修改循环以首先排出队列,然后调用处理程序时,我立即得到了预期的行为。
while(server_active)
{
std::vector<Completions> completions;
while(1) // drain the queue
{
status = _completionQueue->AsyncNext(&tag, &ok);
if (status == grpc::CompletionQueue::GOT_EVENT)
{
completions.emplace_back(tag, ok);
}
else
{
break;
}
}
for (auto completion : completions)
{
CallHandler(completion.tag, completion.ok); // does the logic
}
}