i具有以下简化程序,该程序旋转一堆子进程,然后使用boost::interprocess::message_queue
向每个消息发送一条消息。当流程数量很少时(在我的机器上大约4个(时,这起作用了,但是随着这个数字的增加,我会收到以下消息:
head (81473): "./a.out"
Assertion failed: (res == 0), function do_wait, file /usr/local/include/boost/interprocess/sync/posix/condition.hpp, line 175.
我猜这是我的同步的问题。.我做错了什么还是boost::interprocess::scoped_lock
不够?
我的程序在这里:
#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/process.hpp>
#include <iostream>
auto main(int argc, char **argv) -> int
{
namespace ip = boost::interprocess;
boost::filesystem::path self{argv[0]};
if (argc == 1) {
std::cout << "head (" << ::getpid() << "): " << self << std::endl;
// create a message queue.
ip::message_queue::remove("work_queue");
ip::message_queue tasks{ip::create_only, "work_queue", 100, sizeof(int)};
// mutex for writing to the queue.
ip::interprocess_mutex mutex{};
// spawn off a bunch of processes.
const auto cores{5 * std::thread::hardware_concurrency()};
std::vector<boost::process::child> workers{};
for (auto i = 0; i < cores; ++i) {
workers.emplace_back(self, "child");
}
// send message to each core.
for (auto i = 0; i < cores; ++i) {
ip::scoped_lock<decltype(mutex)> lock{mutex};
tasks.send(&i, sizeof(i), 0);
}
// wait for each process to finish.
for (auto &worker : workers) {
worker.wait();
}
} else if (argc == 2 && std::strcmp(argv[1], "child") == 0) {
// connect to message queue.
ip::message_queue tasks{ip::open_only, "work_queue"};
// mutex for reading from the queue.
ip::interprocess_mutex mutex{};
unsigned int priority;
ip::message_queue::size_type recvd_size;
{
ip::scoped_lock<decltype(mutex)> lock{mutex};
int number;
tasks.receive(&number, sizeof(number), recvd_size, priority);
std::cout << "child (" << ::getpid() << "): " << self << ", received: " << number << std::endl;
}
}
return 0;
}
您在堆栈上创建一个interprocess_mutex
实例。因此,每个过程都有自己的静音,锁定它不会同步任何东西。您需要创建一个共享的内存区域,将Mutex放在那里,然后在子进程中打开相同的共享内存区域,以访问由父进程创建的静音。