如何将没有终止字符和大小的返回消息一起使用提升async_read?



我正在尝试实现一个带有异步套接字的ssl客户端,该客户端从服务器发送和接收protobuf消息。消息的格式是前 4 个字节表示消息的大小在后面 (X),其余的是具有 X 字节的响应消息。

问题是我不知道如何获取前 4 个字节来获取消息大小以读取其余部分。

async_read()需要确切的大小,即 4 个字节,但我不知道下一步该怎么做?async_read_until()需要一个消息没有的终止字符。

怎么做?我来自Java和C#,对boost和C++并不熟悉。

源代码附在下面。请搜索"TODO"以到达执行读取操作的 LOC...

PS:我无法从服务器更改任何内容。仅将二进制文件:(

#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <istream>
#include <ostream>
#include <string>
#include "client/connection/authentication.pb.h"
#include "client/connection/authentication.pb.cc"
#include "client/msg.pb.h"
#include "client/msg.pb.cc"
#include "client/common.pb.h"
#include "client/common.pb.cc"
class client
{
public:
client(boost::asio::io_service& io_service, boost::asio::ssl::context& context, boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
: socket_(io_service, context)
{
socket_.set_verify_mode(boost::asio::ssl::context::verify_none);
socket_.set_verify_callback(boost::bind(&client::verify_certificate, this, _1, _2));
boost::asio::async_connect(socket_.lowest_layer(), endpoint_iterator, boost::bind(&client::handle_connect, this, boost::asio::placeholders::error));
}
bool verify_certificate(bool preverified, boost::asio::ssl::verify_context& ctx)
{
char subject_name[256];
X509* cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
std::cout << "Verifying:n" << subject_name << std::endl;
return preverified;
}
void handle_connect(const boost::system::error_code& error)
{
if(!error){
std::cout << "Connection OK!" << std::endl;
socket_.async_handshake(boost::asio::ssl::stream_base::client, boost::bind(&client::handle_handshake, this, boost::asio::placeholders::error));
}else{
std::cout << "Connect failed: " << error.message() << std::endl;
}
}
void handle_handshake(const boost::system::error_code& error)
{
if(!error){
std::cout << "Sending request: " << std::endl;
//      std::stringstream request_;
//      request_ << "GET /api/0/data/ticker.php HTTP 1.1rn";
//      request_ << "Host: mtgox.comrn";
//      request_ << "Accept-Encoding: *rn";
//      request_ << "rn";
protobuf::Message msg;
char *data = new char[dlen];
bool ok = msg.SerializeToArray(data,dlen);
//    uint32_t n = htonl(dlen);
uint32_t n = dlen;
//    char bytes[4];
//    bytes[0] = (n >> 24) & 0xFF;
//    bytes[1] = (n >> 16) & 0xFF;
//    bytes[2] = (n >> 8) & 0xFF;
//    bytes[3] = n & 0xFF;
int sizeOfPacket = 4 + dlen;
char* rq = new char[sizeOfPacket];
//    strncpy(rq, bytes, 4);
rq[0] = (n >> 24) & 0xFF;
rq[1] = (n >> 16) & 0xFF;
rq[2] = (n >> 8) & 0xFF;
rq[3] = n & 0xFF;
strncpy(rq + 4,data, dlen);
//       request_<< rq;
//      std::cerr << request_.str() << std::endl;
boost::asio::async_write(socket_, boost::asio::buffer(rq,sizeOfPacket), boost::bind(&client::handle_write, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
}else{
std::cout << "Handshake failed: " << error.message() << std::endl;
}
}
void handle_write(const boost::system::error_code& error, size_t bytes_transferred)
{
if (!error){
std::cout << "Sending request OK!" << std::endl;
char respond[4] = "";
boost::asio::async_read(socket_, boost::asio::buffer(respond,4),
boost::bind(&client::handle_read,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
//      std::cerr << "respond is " << respond;
//TODO
}else{
std::cout << "Write failed: " << error.message() << std::endl;
}
}
void handle_read(const boost::system::error_code& error, size_t bytes_transferred)
{
if (!error){
std::cout << "Reply: ";
std::cout.write(reply_, bytes_transferred);
std::cout << "n";
}else{
std::cout << "Read failed: " << error.message() << std::endl;
}
}
private:
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> socket_;
char reply_[0x1 << 16];
};
int main(int argc, char* argv[])
{
try{
boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver::query query("192.168.2.32", "443");
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve(query);
boost::asio::ssl::context context(boost::asio::ssl::context::sslv23);
//    context.load_verify_file("key.pem");
client c(io_service, context, iterator);
io_service.run();
}catch (std::exception& e){
std::cerr << "Exception: " << e.what() << "n";
}
std::cin.get();
return 0;
}

更新:这个实现有什么问题吗?

void handle_write(const boost::system::error_code& error, size_t bytes_transferred)
{
if (!error){
std::cout << "Sending request OK!" << std::endl;
char respond[4] = "";
boost::asio::async_read(socket_, boost::asio::buffer(respond,4),
boost::bind(&client::handle_read,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
int sizeOfMessage = getSizeFromHeader(respond);//need implement
char message[sizeOfMessage] ="";
boost::asio::async_read(socket_, boost::asio::buffer(message,sizeOfMessage),
boost::bind(&client::handle_read,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
decodeMessage(message);
//      std::cerr << "respond is " << respond;
//TODO

}else{
std::cout << "Write failed: " << error.message() << std::endl;
}
}
char message[sizeOfMessage] ="";
boost::asio::async_read(socket_, boost::asio::buffer(message,sizeOfMessage),
boost::bind(&client::handle_read,
this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
decodeMessage(message);

这永远不起作用,因为message是一个局部变量,因此在异步操作运行/完成之前它就不复存在了。

这里也一样:

char respond[4] = "";
ba::async_read(
socket_, ba::buffer(respond, 4),
boost::bind(&client::handle_read, this, ba::placeholders::error, ba::placeholders::bytes_transferred));
//      std::cerr << "respond is " << respond;
// TODO

有趣的是,handle_read你不使用它,你使用reply_哪个是成员。那就好多了。


char *data = new char[dlen];
bool ok = msg.SerializeToArray(data, dlen);

未定义dlen。即使您确实定义了它,也永远不会检查ok。哎哟。接下来,data永远不会deleted[]-ed。

uint32_t n = dlen;
//    char bytes[4];
//    bytes[0] = (n >> 24) & 0xFF;
//    bytes[1] = (n >> 16) & 0xFF;
//    bytes[2] = (n >> 8) & 0xFF;
//    bytes[3] = n & 0xFF;
int sizeOfPacket = 4 + dlen;
char *rq = new char[sizeOfPacket];
//    strncpy(rq, bytes, 4);
rq[0] = (n >> 24) & 0xFF;
rq[1] = (n >> 16) & 0xFF;
rq[2] = (n >> 8) & 0xFF;
rq[3] = n & 0xFF;
strncpy(rq + 4, data, dlen);

到现在为止,您再次复制了所有内容(即使序列化失败),并且又有一次内存泄漏,这次大了 4 个字节。

//      std::cerr << request_.str() << std::endl;

将二进制数据写入std::cerr不会很漂亮(并且通常由于 0 个字符而不起作用)。


现在终于,当您handle_read您可能会遇到SSLSSL_R_SHORT_READ,因为缓冲区具有固定的大小并且响应可能较小(如果响应较大,您将不知道)。所以处理这种情况:

if (!error || error == ssl::error::stream_errors::stream_truncated) {

有点固定的样本

住在科里鲁

//#include "client/common.pb.cc"
//#include "client/common.pb.h"
//#include "client/connection/authentication.pb.cc"
//#include "client/connection/authentication.pb.h"
//#include "client/msg.pb.cc"
//#include "client/msg.pb.h"
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <istream>
#include <ostream>
#include <string>
#include <cstring>
namespace ba = boost::asio;
namespace ssl = ba::ssl;
using ba::ip::tcp;
static constexpr int dlen = 300;
namespace protobuf {
struct Message {
bool SerializeToArray(char* p, size_t n) {
strncpy(p, "hello worldn", n);
return true;
}
};
} // namespace protobuf
class client {
public:
client(ba::io_service &io_service, ssl::context &context, tcp::resolver::iterator endpoint_iterator)
: socket_(io_service, context) 
{
socket_.set_verify_mode(ssl::context::verify_none);
socket_.set_verify_callback(boost::bind(&client::verify_certificate, this, _1, _2));
ba::async_connect(socket_.lowest_layer(), endpoint_iterator,
boost::bind(&client::handle_connect, this, ba::placeholders::error));
}
bool verify_certificate(bool preverified, ssl::verify_context &ctx) {
char subject_name[256];
X509 *cert = X509_STORE_CTX_get_current_cert(ctx.native_handle());
X509_NAME_oneline(X509_get_subject_name(cert), subject_name, 256);
std::cout << "Verifying:n" << subject_name << std::endl;
return preverified;
}
void handle_connect(const boost::system::error_code &error) {
if (!error) {
std::cout << "Connection OK!" << std::endl;
socket_.async_handshake(ssl::stream_base::client,
boost::bind(&client::handle_handshake, this, ba::placeholders::error));
} else {
std::cout << "Connect failed: " << error.message() << std::endl;
}
}
void handle_handshake(const boost::system::error_code &error) {
if (!error) {
std::cout << "Sending request: " << std::endl;
// prepare request_ buffer
{
request_.consume(request_.size());
std::ostream os(&request_);
os << "GET /api/0/data/ticker.php HTTP 1.1rn";
os << "Host: mtgox.comrn";
os << "Accept-Encoding: *rn";
os << "rn";
std::vector<char> data(dlen + 4); // perhaps avoid SerializeToArray and skip the copy
// fill length
{
uint32_t n = htonl(dlen);
data[0] = (n >> 24) & 0xFF;
data[1] = (n >> 16) & 0xFF;
data[2] = (n >> 8) & 0xFF;
data[3] = n & 0xFF;
}
// fill message
{
protobuf::Message msg;
bool ok = msg.SerializeToArray(data.data() + 4, dlen);
if (!ok)
throw std::runtime_error("Do something!");
}
// write buffer
os.write(data.data(), data.size());
}
ba::async_write(
socket_, request_,
boost::bind(&client::handle_write, this, ba::placeholders::error, ba::placeholders::bytes_transferred));
} else {
std::cout << "Handshake failed: " << error.message() << std::endl;
}
}
void handle_write(const boost::system::error_code &error, size_t bytes_transferred) {
if (!error) {
std::cout << "Sending request OK! (" << bytes_transferred << " bytes)" << std::endl;
ba::async_read(
socket_, ba::buffer(reply_),
boost::bind(&client::handle_read, this, ba::placeholders::error, ba::placeholders::bytes_transferred));
} else {
std::cout << "Write failed: " << error.message() << std::endl;
}
}
void handle_read(const boost::system::error_code &error, size_t bytes_transferred) {
if (!error || error == ssl::error::stream_errors::stream_truncated) {
std::cout << "Reply: ";
std::cout.write(reply_, bytes_transferred);
std::cout << "n";
} else {
std::cout << "Read failed: " << error.message() << std::endl;
}
}
private:
ssl::stream<tcp::socket> socket_;
ba::streambuf request_;
char reply_[1 << 16];
};
int main() {
try {
ba::io_service io_service;
tcp::resolver resolver(io_service);
tcp::resolver::query query("192.168.2.32", "443");
tcp::resolver::iterator iterator = resolver.resolve(query);
ssl::context context(ssl::context::sslv23);
// context.load_verify_file("key.pem");
client c(io_service, context, iterator);
io_service.run();
} catch (std::exception &e) {
std::cerr << "Exception: " << e.what() << "n";
}
}

针对本地测试 SSL 服务器进行测试时(密码短语"test"):

openssl s_server -accept 6767 -cert ~/custom/boost/libs/asio/example/cpp03/ssl/server.pem -CAfile ~/custom/boost/libs/asio/example/cpp03/ssl/ca.pem 

它打印了:

./sotest 
Connection OK!
Verifying:
/C=AU/ST=NSW/L=Sydney/O=asio
Sending request: 
Sending request OK! (380 bytes)
error: asio.ssl:335544539 (short read)
Reply: asdasdasdasdasd

显然,asdasdasdasdasd是我在s_server末尾输入的响应。

相关内容

最新更新