boost::interprocess::message_queue第二个进程中未收到消息



我正在使用boost的消息队列编写一个只有两个char数组的基类,但在第二个进程中没有接收到数据是空的,即使get_num_msg()在读取前返回1,在读取后返回0。出于调试目的,我还尝试从同一个过程中进行编写和读取,结果很好。我使用共享指针是因为之前在读取和写入整数时,它不会在接收器处读取整数,除非它被声明为共享ptr。

访问队列

class AccessQueue {
public:
char name[64];
char action[64];
AccessQueue(char name[64], char action[64]) {
strcpy(this->name, name);
strcpy(this->action, action);
}
AccessQueue() {}
};

发送功能

// std::shared_ptr<AccessQueue> p1;
this->p1.reset(new AccessQueue("asd", "vsq"));
try {
this->mq->send(&p1, sizeof(p1), 0);
} catch(boost::interprocess::interprocess_exception & ex) {
std::cout << ex.what() << std::endl;
}

接收功能

std::cout << this->mq->get_num_msg() << "t" << this->mq->get_max_msg_size() << "t" << this->mq->get_max_msg() << std::endl;
AccessQueue * a;
unsigned int priority;
boost::interprocess::message_queue::size_type recvd_size;
try {
this->mq->try_receive(&a, sizeof(AccessQueue), recvd_size, priority);
} catch(boost::interprocess::interprocess_exception & ex) {
std::cout << ex.what() << std::endl;
}
std::cout << this->mq->get_num_msg() << "t" << this->mq->get_max_msg_size() << "t" << this->mq->get_max_msg() << std::endl;
std::cout << "It clearly maybe works " << a->action << "t" << a->name << std::endl;

接收器端的输出:

1       128     20
0       128     20

看起来p1(在发送函数中(是一个智能指针(类似于std::unique_ptrstd::shared_ptr(。在这种情况下

this->mq->send(&p1, sizeof(p1), 0);

显然是错误的,因为它将指针对象放在队列上,而不是数据结构上。使用

this->mq->send(*p1, sizeof(*p1), 0);

或者,事实上,首先不要使用动态分配:

AccessQueue packet("asd", "vsq");
mq.send(&packet, sizeof(packet), 0);

哦还有更多

在接收端,也存在类似的问题:

AccessQueue * a;
// ..
mq.try_receive(&a, sizeof(AccessQueue), ...);

接收INTO的是指针,而不是对象。您甚至没有对象,因为a(指针(从未初始化过。这里的解决方案在语法上很简单:

AccessQueue a;

没有更多的指针。现在,a是一个对象,&a是该对象的地址。

注意原来是UB,因为您将sizeof(AccessQueue)字节读取到指针中。然而,指针只有8个字节,而结构是128个字节。哎呀!

简化的工作演示

这项工作:

在Wandbox上直播

#include <boost/interprocess/ipc/message_queue.hpp>
#include <iostream>
#include <iomanip>
namespace bip = boost::interprocess;
using MQ = bip::message_queue;
template<size_t N>
static inline void safe_copy(std::array<char, N>& dst, std::string_view src) {
std::copy_n(src.data(), std::min(src.size(), N), dst.data());
dst.back() = 0; // make sure of NUL termination
}
struct AccessQueue {
std::array<char, 64> name{0};
std::array<char, 64> action{0};
AccessQueue(std::string_view n = "", std::string_view a = "") {
safe_copy(name, n);
safe_copy(action, a);
}
};
static_assert(std::is_standard_layout_v<AccessQueue>);
struct X {
void send() {
AccessQueue packet("asd", "vsq");
try {
mq.send(&packet, sizeof(packet), 0);
} catch(std::exception const & ex) {
std::cout << ex.what() << std::endl;
}
}
AccessQueue receive() {
AccessQueue retval;

report();
try {
unsigned int priority;
MQ::size_type recvd_size;
mq.try_receive(&retval, sizeof(AccessQueue), recvd_size, priority);
} catch(std::exception const & ex) {
std::cout << ex.what() << std::endl;
}
report();
return retval;
}
void report() {
std::cout << mq.get_num_msg() << "t" << mq.get_max_msg_size() << "t" << mq.get_max_msg() << std::endl;
}
MQ mq { bip::open_or_create, "somequeue", 10, sizeof(AccessQueue) };
};
int main() {
X tryit;
tryit.send();
auto const& [name, action] = tryit.receive();
std::cout << std::quoted(name.data()) << " " << std::quoted(action.data()) << std::endl;
}

打印

1       128     10
0       128     10
"asd" "vsq"

票据

  • 在C数组上使用std::array默认情况下提供复制语义
  • 保护AccessQueue的POD性
  • 确保成员已初始化
  • 确保副本安全
  • 确保副本始终以NUL结尾
  • 不要使用new或删除。为什么C++程序员应该尽量减少';新'
  • 确保接收缓冲区大小与maxmsgsize(boost interprocess message_queue和fork(匹配

禁止在Wandbox上共享emory:(

最新更新