如何使用gRPC传递大量数据(未知大小,最小10GB)



我从一个源获得流数据,在最终处理之前,这些数据的大小是未知的,但最小值是10 GB。我必须使用gRPC发送这么大量的数据。

这里需要提及的是,在streaming的处理完成时,该large amount data将通过gRPC。在这一步中,我考虑将所有值存储在vector中。

关于发送大量数据,我试图获得想法,发现:
  • 这里提到不要使用gRPC传递大数据。在这里,我提到使用任何其他消息协议,其中我有使用其他东西而不是gRPC的限制(至少到今天为止(
  • 从这篇帖子中,我试图知道如何发送chunk message,但我不确定它是否与我的问题有关
  • 我在第一篇文章中找到了一个使用go语言流式传输数据的博客
  • 这是本文使用python语言进行的演示。但它也是不完整的
  • gRPC示例可能是一个良好的开端,bt由于缺乏C++知识而无法解码

从那里开始,我在这个问题上做了一个巨大的更新。但问题的主题没有改变

到目前为止我做了什么,以及关于我的项目的一些要点。github回购在这里可用
  • 项目中存在Unary rpc

  • 我知道我的新Bi directional rpc需要一些时间。我希望Unary rpc不会等待Bi directional rpc的完成。现在我在想synchronous way,其中Unary rpc正在等待通过它的status以完成流式传输。

  • 我正在避免C++ code中不必要的行。但是给整个proto文件

  • big_data.proto

syntax = "proto3";
package demo_grpc;
message Large_Data {
repeated int32 large_data_collection = 1 [packed=true];
int32 data_chunk_number = 2;
}
  • addressbook.proto
syntax = "proto3";
package demo_grpc;
import "myproto/big_data.proto";
message S_Response {
string name     = 1;
string street   = 2;
string zip      = 3;
string city     = 4;
string country  = 5;
int32 double_init_val = 6;
}
message C_Request {
uint32 choose_area = 1;
string name = 2;
int32 init_val = 3;
}
service AddressBook {
rpc GetAddress(C_Request) returns (S_Response) {}
rpc Stream_Chunk_Service(stream Large_Data) returns (stream Large_Data) {}
}
  • client.cpp
#include <big_data.pb.h>
#include <addressbook.grpc.pb.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/create_channel.h>
#include <iostream>
#include <numeric>
using namespace std;
// This function prompts the user to set value for the required area
void Client_Request(demo_grpc::C_Request &request_)
{
// do processing for unary rpc. Intentionally avoided here
}
// According to Client Request this function display the value of protobuf message
void Server_Response(demo_grpc::C_Request &request_, const demo_grpc::S_Response &response_)
{
// do processing for unary rpc. Intentionally avoided here
}
// following function make large vector and then chunk to send via stream from client to server
void Stream_Data_Chunk_Request(demo_grpc::Large_Data &request_,
demo_grpc::Large_Data &response_,
uint64_t preferred_chunk_size_in_kibyte)
{
// A dummy vector which in real case will be the large data set's container
std::vector<int32_t> large_vector;
// irerate it now for 1024*10 times
for(int64_t i = 0; i < 1024 * 10; i++)
{
large_vector.push_back(1);
}
uint64_t preferred_chunk_size_in_kibyte_holds_integer_num = 0; // 1 chunk how many intger will contain that num will come here
// total chunk number will be updated here
uint32_t total_chunk = total_chunk_counter(large_vector.size(), preferred_chunk_size_in_kibyte, preferred_chunk_size_in_kibyte_holds_integer_num);
// A temp counter to trace the index of the large_vector
int32_t temp_count = 0;
// loop will start if the total num of chunk is greater than 0. After each iteration total_chunk will be decremented
while(total_chunk > 0)
{
for (int64_t i = temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i < preferred_chunk_size_in_kibyte_holds_integer_num + temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i++)
{
// the repeated field large_data_collection is taking value from the large_vector
request_.add_large_data_collection(large_vector[i]);
}
temp_count++;
total_chunk--;
std::string ip_address = "localhost:50051";
auto channel = grpc::CreateChannel(ip_address, grpc::InsecureChannelCredentials());
std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);
grpc::ClientContext context;
std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data> > stream(stub->Stream_Chunk_Service(&context));
// While the size of each chunk is eached then this repeated field is cleared. I am not sure before this
// value can be transfered to server or not. But my assumption is saying that it should be done
request_.clear_large_data_collection();
}
}
int main(int argc, char* argv[])
{
std::string client_address = "localhost:50051";
std::cout << "Address of client: " << client_address << std::endl;
// The following part for the Unary RPC
demo_grpc::C_Request query;
demo_grpc::S_Response result;
Client_Request(query);
// This part for the streaming chunk data (Bi directional Stream RPC)
demo_grpc::Large_Data stream_chunk_request_;
demo_grpc::Large_Data stream_chunk_response_;
uint64_t preferred_chunk_size_in_kibyte = 64;
Stream_Data_Chunk_Request(stream_chunk_request_, stream_chunk_response_, preferred_chunk_size_in_kibyte);
// Call
auto channel = grpc::CreateChannel(client_address, grpc::InsecureChannelCredentials());
std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);
grpc::ClientContext context;
grpc::Status status = stub->GetAddress(&context, query, &result);
// the following status is for unary rpc as far I have understood the structure
if (status.ok())
{
Server_Response(query, result);
}
else
{
std::cout << status.error_message() << std::endl;
}
return 0;
}
  • heper function total_chunk_counter
#include <cmath>
uint32_t total_chunk_counter(uint64_t num_of_container_content,
uint64_t preferred_chunk_size_in_kibyte,
uint64_t &preferred_chunk_size_in_kibyte_holds_integer_num)
{
uint64_t cotainer_size_in_kibyte = (32ULL * num_of_container_content) / 1024;
preferred_chunk_size_in_kibyte_holds_integer_num = (num_of_container_content * preferred_chunk_size_in_kibyte) / cotainer_size_in_kibyte;
float total_chunk = static_cast<float>(num_of_container_content) / preferred_chunk_size_in_kibyte_holds_integer_num;
return std::ceil(total_chunk);
}
  • server.cpp完全不完全
#include <myproto/big_data.pb.h>
#include <myproto/addressbook.grpc.pb.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/server_builder.h>
#include <iostream>
class AddressBookService final : public demo_grpc::AddressBook::Service {
public:
virtual ::grpc::Status GetAddress(::grpc::ServerContext* context, const ::demo_grpc::C_Request* request, ::demo_grpc::S_Response* response)
{
switch (request->choose_area())
{
// do processing for unary rpc. Intentionally avoided here
std::cout << "Information of " << request->choose_area() << " is sent to Client" << std::endl;
return grpc::Status::OK;
}

// Bi-directional streaming chunk data
virtual ::grpc::Status Stream_Chunk_Service(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data>* stream)
{
// stream->Large_Data;
return grpc::Status::OK;
}
};
void RunServer()
{
std::cout << "grpc Version: " << grpc::Version() << std::endl;
std::string server_address = "localhost:50051";
std::cout << "Address of server: " << server_address << std::endl;
grpc::ServerBuilder builder;
builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
AddressBookService my_service;
builder.RegisterService(&my_service);
std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
server->Wait();
}
int main(int argc, char* argv[])
{
RunServer();
return 0;
}

总之,我的愿望

  • 我需要将large_vector的内容与消息Large_Datarepeated field large_data_collection一起传递。我应该chunklarge_vector的大小,并用该chunk size填充repeated field large_data_collection
  • 在服务器端,所有的CCD_ 32将通过保持CCD_。将对它们进行一些处理(例如:double the value of each index(。然后,整个数据将再次作为chunk stream发送到client
  • 如果现在的unary rpc不等待bi-directional rpc的完成,那就太好了

带示例的解决方案将非常有用。提前感谢。github回购在这里可用。

由于许多个人问题,我很晚才公布问题的答案。顺便说一下,我通过以下方式实现了我的解决方案。我的解决方案的一点总结:

  • 我试图传递大量数据,为此我选择了一个大尺寸的矢量,其大小会有所不同
  • 我已经计算了向量中的样本数量,选择了块的大小,并计算了每个块将传递多少整数
  • 最后,根据块的数量逐个传递数据
  • 我选择的一个限制是块的大小应该是64或者它的倍数
  • 完整的解决方案在我的git repo中可用。我在这里只链接了合并请求的提交

协议消息

// Proto file to send large amount of data using stream
syntax = "proto3";
package demo_grpc;
message Streaming
{
repeated int32 data_collection = 1 [packed=true];
int32 index = 2;
}
// Follwoing messages will be used for Bi-directional streaming (chunk data)
// Client side message
message Large_Data_Request {
repeated int32 chunk_data_client_request = 1 [packed=true]; // Data container where chunk data will be stored to pass as stream from client to server
uint32 client_data_stream_size = 2; // This will be the size of the whole data stream from client to server (Eg: Size of a vector. vec.size())
uint32 chunk_data_length = 3; // Number of data samples in each chunk
uint32 required_chunk = 4; // Number of chunk to complete the Streaming
string name = 5; // Name name of the data stream
}
// Server side message
message Large_Data_Response {
repeated int32 chunk_data_server_response = 1 [packed=true]; // Data container to be used to do stream operation from server to client
int32 server_data_stream_size = 2; // This will be the size of the whole data stream from server to client (Eg: Size of a vector. vec.size())
}

客户端请求发送流数据


#include "client_zone.h"
// Calculates the number of chunk
/**
* param [in] data_size Total size of given/provided data which will be passed as chunk. Eg: myvec.size()
* param [in] chunk_size Chunk size in KiloByte. This chunk size will be given by the user. Right now less than 64 KB is not allowed
* param [in] sample Data sample in each chunk
*/
void grpc_client::get_chunk_number(uint64_t data_size,
uint64_t chunk_size,
uint64_t &sample,
uint32_t &total_chunk)
{
// Each integer size is 4 Byte. That's why N integer size will be N * 4 bit
// To convert this into KB we have to divide the prior result with 1024. Eg: (vector.size() * 4) / 1024
uint64_t data_size_kb = (4ULL * data_size) / 1024;
float total_chunk_intermediate = static_cast<float>(data_size) / sample;
total_chunk = std::ceil(total_chunk_intermediate);
}
// After producing large data set it passes them & get response from server as chunk
void grpc_client::data_chunk_stream_request()
{
demo_grpc::Large_Data_Request request_;
demo_grpc::Large_Data_Response response_;
// This chunk size will be provided by the user. It will not be greater than 4 MB and will be provided in KB unit.
// Eg: If you want to provide a chunk size of 3 MB you have to provide 3072 (3MB = 3 * 1024 KB)
// Also, this value will be a multiple of 64. That means at least a chunk size should be provided which can carry
// 64 KB data
uint64_t chunk_size;
std::cout << std:: endl << "Provide chunk size (KB). Not more than 4 MB. Will be multiple of 64: ";
std::cin >> chunk_size;
if (static_cast<float>(chunk_size) / 1024 > 4)
{
throw std::runtime_error("Chunk size is > 4 MB. Message field of protobuf is limited to 4 MB");
}
if (chunk_size % 64 != 0)
{
throw std::runtime_error("Please provide chunk size in multiple of 64 otherwise Number of chunk will be fractional");
}
// This vector is the container to hold dummy data created in client side
std::vector<int32_t> dummy_data_set;
// This is the size of the vector which will be chose by customer
uint32_t vector_size;
std::cout << "Dummy data will be create in client side. Please provide a size of vector: ";
std::cin >> vector_size;
for(int64_t i = 0; i < vector_size; i++)
{
dummy_data_set.push_back(2);
}
// Number of data in each chunk. Each integer is 4 byte. If chunk size is 64 KB that means "sample" will be number of
// Integers resides in this each chunk size.
// Convert this 64 KB to byte = 64 * 1024 Byte.
// 4 byte is taken by 1 integer, so, 64 * 1024 bytes is taken by (64*1024)/4 integer
uint64_t sample = (chunk_size * 1024)/4;
// Number of chunk to transfer whole data set (here, dummy_data_set)
uint32_t total_chunk = 0;
get_chunk_number(dummy_data_set.size(), chunk_size, sample, total_chunk);
// Proto Buffer message is preparing to pass to Server
request_.set_chunk_data_length(sample);
request_.set_client_data_stream_size(dummy_data_set.size());
request_.set_required_chunk(total_chunk);
request_.set_name(detected_area_name);
grpc::ClientContext context;
std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data_Request, ::demo_grpc::Large_Data_Response> > stream(stub->Stream_Chunk_Service(&context));
int32_t temp_count = 0;
while(total_chunk > 0)
{
for (int64_t i = temp_count * sample; i < sample + temp_count * sample; i++)
{
// This condition checks the iteration number with the vector data size of the client
// if client data size has reached then writing will be stopped
if (i < dummy_data_set.size())
{
request_.add_chunk_data_client_request(dummy_data_set[i]);
}
else
{
break;
}
}
temp_count++;
total_chunk--;
stream->Write(request_);
// message field will be cleared after passing one chunk. It provides a fresh repeated field in the next iteration to pass a new chunk
request_.clear_chunk_data_client_request();
}
stream->WritesDone();
// Reading chunk response from server
std::vector<int32_t> dummy_final_data_set;
// This variable checks data size of client during read back from server
uint32_t client_track_data_size_before_reading = 0;
while (stream->Read(&response_))
{
for(int64_t i = 0; i < request_.chunk_data_length(); i++)
{
if (client_track_data_size_before_reading < dummy_data_set.size())
{
dummy_final_data_set.push_back(response_.chunk_data_server_response(i));
client_track_data_size_before_reading++;
}
else
{
break;
}
}
}
grpc::Status status = stream->Finish();
if (status.ok())
{
if (dummy_final_data_set.size() == response_.server_data_stream_size() | dummy_final_data_set.size() < response_.server_data_stream_size())
{
std::cout << "Server successfully has sent all data" << std::endl;
}
}
else
std::cout << "!!!!! Server is Failed to Stream !!!!!" << std::endl;
}

服务器端对处理后返回蒸汽数据的响应

/*
This is the service response algorithm from server to pass
stream data using chunk
*/
#include <numeric>
void Data_Chunk_Stream_Response(::grpc::ServerReaderWriter< ::demo_grpc::Large_Data_Response, ::demo_grpc::Large_Data_Request>* stream)
{
// Instance of request and response from client & server respectively
demo_grpc::Large_Data_Request request;
demo_grpc::Large_Data_Response response;
// This vector will hold all data which will be transferred from client platform
std::vector<int32_t> server_dummy_data_set;
// This variable will track what is the size of the data which is passed from client
uint32_t server_track_client_data_size_before_reading = 0;
// Read operation
while (stream->Read(&request))
{
// Iterate till the chunk data length is reached
for(int64_t i = 0; i < request.chunk_data_length(); i++)
{
// This condition checks the iteration number with the vector data size of the client.
// if client data size has reached then reading will be stopped
if(server_track_client_data_size_before_reading < request.client_data_stream_size())
{
server_dummy_data_set.push_back(request.chunk_data_client_request(i));
server_track_client_data_size_before_reading++;
}
else
{
break;
}
}
}
std::cout << std::endl;
if(server_dummy_data_set.size() == request.client_data_stream_size())
{
std::cout << "Prepared data in server is identical to the data size transferred by client" << std::endl;
}
else
{
throw std::runtime_error("Caution !!! Data sample is mismatched");
}
// set proto message for server side
response.set_server_data_stream_size(server_dummy_data_set.size());
int64_t track_index = 0; // Tracks the current index of the server_dummy_data_set
// Server starts streaming the data chunk by chunk to the client. Here, server is doing a dummy operation on clients data.
// Here, it multiplies by 2 each data of the client.
// This variable will track client side data size during write back to client
uint32_t server_track_client_data_size_before_writing = 0;
for (int64_t i = 0; i < request.required_chunk(); i++)
{
for(int64_t j = track_index * request.chunk_data_length(); j < request.chunk_data_length() + track_index * request.chunk_data_length(); j++)
{
if(server_track_client_data_size_before_writing < request.client_data_stream_size())
{
response.add_chunk_data_server_response(server_dummy_data_set[j] * 2);
server_track_client_data_size_before_writing++;
}
else
{
break;
}
}
track_index++;
stream->Write(response);
// message field will be cleared after passing one chunk. It provides a fresh repeated field in the next iteration to pass a new chunk
response.clear_chunk_data_server_response();
}
}

非常感谢任何关于更新工作、添加新功能/修改算法/想法/工作的建议

最新更新