在尝试使用boost时断言.跨很多过程中的互动消息队列



i具有以下简化程序,该程序旋转一堆子进程,然后使用boost::interprocess::message_queue向每个消息发送一条消息。当流程数量很少时(在我的机器上大约4个(时,这起作用了,但是随着这个数字的增加,我会收到以下消息:

head (81473): "./a.out"
Assertion failed: (res == 0), function do_wait, file /usr/local/include/boost/interprocess/sync/posix/condition.hpp, line 175.

我猜这是我的同步的问题。.我做错了什么还是boost::interprocess::scoped_lock不够?

我的程序在这里:

#include <boost/interprocess/ipc/message_queue.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/process.hpp>
#include <iostream>
auto main(int argc, char **argv) -> int
{
    namespace ip = boost::interprocess;
    boost::filesystem::path self{argv[0]};
    if (argc == 1) {
        std::cout << "head (" << ::getpid() << "): " << self << std::endl;
        // create a message queue.
        ip::message_queue::remove("work_queue");
        ip::message_queue tasks{ip::create_only, "work_queue", 100, sizeof(int)};
        // mutex for writing to the queue.
        ip::interprocess_mutex mutex{};
        // spawn off a bunch of processes.
        const auto cores{5 * std::thread::hardware_concurrency()};
        std::vector<boost::process::child> workers{};
        for (auto i = 0; i < cores; ++i) {
            workers.emplace_back(self, "child");
        }
        // send message to each core.
        for (auto i = 0; i < cores; ++i) {
            ip::scoped_lock<decltype(mutex)> lock{mutex};
            tasks.send(&i, sizeof(i), 0);
        }
        // wait for each process to finish.
        for (auto &worker : workers) {
            worker.wait();
        }
    } else if (argc == 2 && std::strcmp(argv[1], "child") == 0) {
        // connect to message queue.
        ip::message_queue tasks{ip::open_only, "work_queue"};
        // mutex for reading from the queue.
        ip::interprocess_mutex mutex{};
        unsigned int priority;
        ip::message_queue::size_type recvd_size;
        {
            ip::scoped_lock<decltype(mutex)> lock{mutex};
            int number;
            tasks.receive(&number, sizeof(number), recvd_size, priority);
            std::cout << "child (" << ::getpid() << "): " << self << ", received: " << number << std::endl;
        }
    }
    return 0;
}

您在堆栈上创建一个interprocess_mutex实例。因此,每个过程都有自己的静音,锁定它不会同步任何东西。您需要创建一个共享的内存区域,将Mutex放在那里,然后在子进程中打开相同的共享内存区域,以访问由父进程创建的静音。

相关内容

  • 没有找到相关文章

最新更新