
我从一个源获得流数据,在最终处理之前,这些数据的大小是未知的,但最小值是10 GB。我必须使用gRPC发送这么大量的数据。

这里需要提及的是,在streaming的处理完成时,该large amount data将通过gRPC。在这一步中,我考虑将所有值存储在vector中。

  • 这里提到不要使用gRPC传递大数据。在这里,我提到使用任何其他消息协议,其中我有使用其他东西而不是gRPC的限制(至少到今天为止(
  • 从这篇帖子中,我试图知道如何发送chunk message,但我不确定它是否与我的问题有关
  • 我在第一篇文章中找到了一个使用go语言流式传输数据的博客
  • 这是本文使用python语言进行的演示。但它也是不完整的
  • gRPC示例可能是一个良好的开端,bt由于缺乏C++知识而无法解码


  • 项目中存在Unary rpc

  • 我知道我的新Bi directional rpc需要一些时间。我希望Unary rpc不会等待Bi directional rpc的完成。现在我在想synchronous way,其中Unary rpc正在等待通过它的status以完成流式传输。

  • 我正在避免C++ code中不必要的行。但是给整个proto文件

  • big_data.proto

syntax = "proto3";
package demo_grpc;
message Large_Data {
repeated int32 large_data_collection = 1 [packed=true];
int32 data_chunk_number = 2;
  • addressbook.proto
syntax = "proto3";
package demo_grpc;
import "myproto/big_data.proto";
message S_Response {
string name     = 1;
string street   = 2;
string zip      = 3;
string city     = 4;
string country  = 5;
int32 double_init_val = 6;
message C_Request {
uint32 choose_area = 1;
string name = 2;
int32 init_val = 3;
service AddressBook {
rpc GetAddress(C_Request) returns (S_Response) {}
rpc Stream_Chunk_Service(stream Large_Data) returns (stream Large_Data) {}
  • client.cpp
#include <big_data.pb.h>
#include <addressbook.grpc.pb.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/create_channel.h>
#include <iostream>
#include <numeric>
using namespace std;
// This function prompts the user to set value for the required area
void Client_Request(demo_grpc::C_Request &request_)
// do processing for unary rpc. Intentionally avoided here
// According to Client Request this function display the value of protobuf message
void Server_Response(demo_grpc::C_Request &request_, const demo_grpc::S_Response &response_)
// do processing for unary rpc. Intentionally avoided here
// following function make large vector and then chunk to send via stream from client to server
void Stream_Data_Chunk_Request(demo_grpc::Large_Data &request_,
demo_grpc::Large_Data &response_,
uint64_t preferred_chunk_size_in_kibyte)
// A dummy vector which in real case will be the large data set's container
std::vector<int32_t> large_vector;
// irerate it now for 1024*10 times
for(int64_t i = 0; i < 1024 * 10; i++)
uint64_t preferred_chunk_size_in_kibyte_holds_integer_num = 0; // 1 chunk how many intger will contain that num will come here
// total chunk number will be updated here
uint32_t total_chunk = total_chunk_counter(large_vector.size(), preferred_chunk_size_in_kibyte, preferred_chunk_size_in_kibyte_holds_integer_num);
// A temp counter to trace the index of the large_vector
int32_t temp_count = 0;
// loop will start if the total num of chunk is greater than 0. After each iteration total_chunk will be decremented
while(total_chunk > 0)
for (int64_t i = temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i < preferred_chunk_size_in_kibyte_holds_integer_num + temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i++)
// the repeated field large_data_collection is taking value from the large_vector
std::string ip_address = "localhost:50051";
auto channel = grpc::CreateChannel(ip_address, grpc::InsecureChannelCredentials());
std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);
grpc::ClientContext context;
std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data> > stream(stub->Stream_Chunk_Service(&context));
// While the size of each chunk is eached then this repeated field is cleared. I am not sure before this
// value can be transfered to server or not. But my assumption is saying that it should be done
int main(int argc, char* argv[])
std::string client_address = "localhost:50051";
std::cout << "Address of client: " << client_address << std::endl;
// The following part for the Unary RPC
demo_grpc::C_Request query;
demo_grpc::S_Response result;
// This part for the streaming chunk data (Bi directional Stream RPC)
demo_grpc::Large_Data stream_chunk_request_;
demo_grpc::Large_Data stream_chunk_response_;
uint64_t preferred_chunk_size_in_kibyte = 64;
Stream_Data_Chunk_Request(stream_chunk_request_, stream_chunk_response_, preferred_chunk_size_in_kibyte);
// Call
auto channel = grpc::CreateChannel(client_address, grpc::InsecureChannelCredentials());
std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);
grpc::ClientContext context;
grpc::Status status = stub->GetAddress(&context, query, &result);
// the following status is for unary rpc as far I have understood the structure
if (status.ok())
Server_Response(query, result);
std::cout << status.error_message() << std::endl;
return 0;
  • heper function total_chunk_counter
#include <cmath>
uint32_t total_chunk_counter(uint64_t num_of_container_content,
uint64_t preferred_chunk_size_in_kibyte,
uint64_t &preferred_chunk_size_in_kibyte_holds_integer_num)
uint64_t cotainer_size_in_kibyte = (32ULL * num_of_container_content) / 1024;
preferred_chunk_size_in_kibyte_holds_integer_num = (num_of_container_content * preferred_chunk_size_in_kibyte) / cotainer_size_in_kibyte;
float total_chunk = static_cast<float>(num_of_container_content) / preferred_chunk_size_in_kibyte_holds_integer_num;
return std::ceil(total_chunk);
  • server.cpp完全不完全
#include <myproto/big_data.pb.h>
#include <myproto/addressbook.grpc.pb.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/server_builder.h>
#include <iostream>
class AddressBookService final : public demo_grpc::AddressBook::Service {
virtual ::grpc::Status GetAddress(::grpc::ServerContext* context, const ::demo_grpc::C_Request* request, ::demo_grpc::S_Response* response)
switch (request->choose_area())
// do processing for unary rpc. Intentionally avoided here
std::cout << "Information of " << request->choose_area() << " is sent to Client" << std::endl;
return grpc::Status::OK;

// Bi-directional streaming chunk data
virtual ::grpc::Status Stream_Chunk_Service(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data>* stream)
// stream->Large_Data;
return grpc::Status::OK;
void RunServer()
std::cout << "grpc Version: " << grpc::Version() << std::endl;
std::string server_address = "localhost:50051";
std::cout << "Address of server: " << server_address << std::endl;
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
AddressBookService my_service;
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
int main(int argc, char* argv[])
return 0;


  • 我需要将large_vector的内容与消息Large_Datarepeated field large_data_collection一起传递。我应该chunklarge_vector的大小,并用该chunk size填充repeated field large_data_collection
  • 在服务器端,所有的CCD_ 32将通过保持CCD_。将对它们进行一些处理(例如:double the value of each index(。然后,整个数据将再次作为chunk stream发送到client
  • 如果现在的unary rpc不等待bi-directional rpc的完成,那就太好了



  • 我试图传递大量数据,为此我选择了一个大尺寸的矢量,其大小会有所不同
  • 我已经计算了向量中的样本数量,选择了块的大小,并计算了每个块将传递多少整数
  • 最后,根据块的数量逐个传递数据
  • 我选择的一个限制是块的大小应该是64或者它的倍数
  • 完整的解决方案在我的git repo中可用。我在这里只链接了合并请求的提交


// Proto file to send large amount of data using stream
syntax = "proto3";
package demo_grpc;
message Streaming
repeated int32 data_collection = 1 [packed=true];
int32 index = 2;
// Follwoing messages will be used for Bi-directional streaming (chunk data)
// Client side message
message Large_Data_Request {
repeated int32 chunk_data_client_request = 1 [packed=true]; // Data container where chunk data will be stored to pass as stream from client to server
uint32 client_data_stream_size = 2; // This will be the size of the whole data stream from client to server (Eg: Size of a vector. vec.size())
uint32 chunk_data_length = 3; // Number of data samples in each chunk
uint32 required_chunk = 4; // Number of chunk to complete the Streaming
string name = 5; // Name name of the data stream
// Server side message
message Large_Data_Response {
repeated int32 chunk_data_server_response = 1 [packed=true]; // Data container to be used to do stream operation from server to client
int32 server_data_stream_size = 2; // This will be the size of the whole data stream from server to client (Eg: Size of a vector. vec.size())


#include "client_zone.h"
// Calculates the number of chunk
* param [in] data_size Total size of given/provided data which will be passed as chunk. Eg: myvec.size()
* param [in] chunk_size Chunk size in KiloByte. This chunk size will be given by the user. Right now less than 64 KB is not allowed
* param [in] sample Data sample in each chunk
void grpc_client::get_chunk_number(uint64_t data_size,
uint64_t chunk_size,
uint64_t &sample,
uint32_t &total_chunk)
// Each integer size is 4 Byte. That's why N integer size will be N * 4 bit
// To convert this into KB we have to divide the prior result with 1024. Eg: (vector.size() * 4) / 1024
uint64_t data_size_kb = (4ULL * data_size) / 1024;
float total_chunk_intermediate = static_cast<float>(data_size) / sample;
total_chunk = std::ceil(total_chunk_intermediate);
// After producing large data set it passes them & get response from server as chunk
void grpc_client::data_chunk_stream_request()
demo_grpc::Large_Data_Request request_;
demo_grpc::Large_Data_Response response_;
// This chunk size will be provided by the user. It will not be greater than 4 MB and will be provided in KB unit.
// Eg: If you want to provide a chunk size of 3 MB you have to provide 3072 (3MB = 3 * 1024 KB)
// Also, this value will be a multiple of 64. That means at least a chunk size should be provided which can carry
// 64 KB data
uint64_t chunk_size;
std::cout << std:: endl << "Provide chunk size (KB). Not more than 4 MB. Will be multiple of 64: ";
std::cin >> chunk_size;
if (static_cast<float>(chunk_size) / 1024 > 4)
throw std::runtime_error("Chunk size is > 4 MB. Message field of protobuf is limited to 4 MB");
if (chunk_size % 64 != 0)
throw std::runtime_error("Please provide chunk size in multiple of 64 otherwise Number of chunk will be fractional");
// This vector is the container to hold dummy data created in client side
std::vector<int32_t> dummy_data_set;
// This is the size of the vector which will be chose by customer
uint32_t vector_size;
std::cout << "Dummy data will be create in client side. Please provide a size of vector: ";
std::cin >> vector_size;
for(int64_t i = 0; i < vector_size; i++)
// Number of data in each chunk. Each integer is 4 byte. If chunk size is 64 KB that means "sample" will be number of
// Integers resides in this each chunk size.
// Convert this 64 KB to byte = 64 * 1024 Byte.
// 4 byte is taken by 1 integer, so, 64 * 1024 bytes is taken by (64*1024)/4 integer
uint64_t sample = (chunk_size * 1024)/4;
// Number of chunk to transfer whole data set (here, dummy_data_set)
uint32_t total_chunk = 0;
get_chunk_number(dummy_data_set.size(), chunk_size, sample, total_chunk);
// Proto Buffer message is preparing to pass to Server
grpc::ClientContext context;
std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data_Request, ::demo_grpc::Large_Data_Response> > stream(stub->Stream_Chunk_Service(&context));
int32_t temp_count = 0;
while(total_chunk > 0)
for (int64_t i = temp_count * sample; i < sample + temp_count * sample; i++)
// This condition checks the iteration number with the vector data size of the client
// if client data size has reached then writing will be stopped
if (i < dummy_data_set.size())
// message field will be cleared after passing one chunk. It provides a fresh repeated field in the next iteration to pass a new chunk
// Reading chunk response from server
std::vector<int32_t> dummy_final_data_set;
// This variable checks data size of client during read back from server
uint32_t client_track_data_size_before_reading = 0;
while (stream->Read(&response_))
for(int64_t i = 0; i < request_.chunk_data_length(); i++)
if (client_track_data_size_before_reading < dummy_data_set.size())
grpc::Status status = stream->Finish();
if (status.ok())
if (dummy_final_data_set.size() == response_.server_data_stream_size() | dummy_final_data_set.size() < response_.server_data_stream_size())
std::cout << "Server successfully has sent all data" << std::endl;
std::cout << "!!!!! Server is Failed to Stream !!!!!" << std::endl;


This is the service response algorithm from server to pass
stream data using chunk
#include <numeric>
void Data_Chunk_Stream_Response(::grpc::ServerReaderWriter< ::demo_grpc::Large_Data_Response, ::demo_grpc::Large_Data_Request>* stream)
// Instance of request and response from client & server respectively
demo_grpc::Large_Data_Request request;
demo_grpc::Large_Data_Response response;
// This vector will hold all data which will be transferred from client platform
std::vector<int32_t> server_dummy_data_set;
// This variable will track what is the size of the data which is passed from client
uint32_t server_track_client_data_size_before_reading = 0;
// Read operation
while (stream->Read(&request))
// Iterate till the chunk data length is reached
for(int64_t i = 0; i < request.chunk_data_length(); i++)
// This condition checks the iteration number with the vector data size of the client.
// if client data size has reached then reading will be stopped
if(server_track_client_data_size_before_reading < request.client_data_stream_size())
std::cout << std::endl;
if(server_dummy_data_set.size() == request.client_data_stream_size())
std::cout << "Prepared data in server is identical to the data size transferred by client" << std::endl;
throw std::runtime_error("Caution !!! Data sample is mismatched");
// set proto message for server side
int64_t track_index = 0; // Tracks the current index of the server_dummy_data_set
// Server starts streaming the data chunk by chunk to the client. Here, server is doing a dummy operation on clients data.
// Here, it multiplies by 2 each data of the client.
// This variable will track client side data size during write back to client
uint32_t server_track_client_data_size_before_writing = 0;
for (int64_t i = 0; i < request.required_chunk(); i++)
for(int64_t j = track_index * request.chunk_data_length(); j < request.chunk_data_length() + track_index * request.chunk_data_length(); j++)
if(server_track_client_data_size_before_writing < request.client_data_stream_size())
response.add_chunk_data_server_response(server_dummy_data_set[j] * 2);
// message field will be cleared after passing one chunk. It provides a fresh repeated field in the next iteration to pass a new chunk

