如何将 grpc c++ ClientAsyncReader<Message> 用于服务器端流



我使用一个非常简单的原型,其中消息只包含1个字符串字段。像这样:

service LongLivedConnection {  
// Starts a grpc connection
rpc Connect(Connection) returns (stream Message) {}
}
message Connection{
string userId = 1;
}
message Message{
string serverMessage = 1;
}

用例是客户端应该连接到服务器,服务器将使用此grpc来推送消息。

现在,对于客户端代码,假设我已经在一个工作线程中,我如何正确地设置它,以便我可以在随机时间连续接收来自服务器的消息?

void StartConnection(const std::string& user) {
Connection request;
request.set_userId(user);
Message message;
ClientContext context;
stub_->Connect(&context, request, &reply);
// What should I do from now on? 
// notify(serverMessage);
}
void notify(std::string message) {
// generate message events and pass to main event loop
}

我弄清楚了如何使用api。看起来很灵活,但仍然有点奇怪,因为我通常只期望异步api接收某种lambda回调。

下面的代码是阻塞的,你必须在不同的线程中运行它,这样它就不会阻塞你的应用程序。

我相信你可以有多个线程访问CompletionQueue,但在我的情况下,我只有一个线程处理这个grpc连接。

GrpcConnection.h file:
public:
void StartGrpcConnection();
private:
std::shared_ptr<grpc::Channel> m_channel;
std::unique_ptr<grpc::ClientReader<push_notifications::Message>> m_reader;
std::unique_ptr<push_notifications::PushNotificationService::Stub> m_stub;
GrpcConnection.cpp files:
...
void GrpcConnectionService::StartGrpcConnection()
{
m_channel = grpc::CreateChannel("localhost:50051",grpc::InsecureChannelCredentials());
LongLiveConnection::Connect request;
request.set_user_id(12345);
m_stub = LongLiveConnection::LongLiveConnectionService::NewStub(m_channel);

grpc::ClientContext context;
grpc::CompletionQueue cq;
std::unique_ptr<grpc::ClientAsyncReader<LongLiveConnection::Message>> reader =
m_stub->PrepareAsyncConnect(&context, request, &cq);
void* got_tag;
bool ok = false;
LongLiveConnection::Message reply;
reader->StartCall((void*)1);
cq.Next(&got_tag, &ok);
if (ok && got_tag == (void*)1)
{
// startCall() is successful if ok is true, and got_tag is void*1
// start the first read message with a different hardcoded tag
reader->Read(&reply, (void*)2);
while (true)
{
ok = false;
cq.Next(&got_tag, &ok);
if (got_tag == (void*)2)
{
// this is the message from server
std::string body = reply.server_message();
// do whatever you want with body, in my case i push it to my applications' event stream to be processed by other components

// lastly, initialize another read
reader->Read(&reply, (void*)2);
}
else if (got_tag == (void*)3)
{
// if you do something else, such as listening to GRPC channel state change, in your call, you can pass a different hardcoded tag, then, in here, you will be notified when the result is received from that call.
}
}
}
}

最新更新