如何将boost io_service与优先级队列一起使用



我有一个程序有两个函数。一个是循环计时器,另一个是接收一些套接字。

我发现,如果在定时器长篇大论之前有多个包进入,那么boost将运行所有套接字句柄,然后运行定时器句柄。

我写了一个简单的代码来模拟这个时间,如下所示:

#include <iostream>
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>
std::string get_time()
{
struct timespec time_spec;
clock_gettime(CLOCK_REALTIME, &time_spec);
int h  = (int)(time_spec.tv_sec / 60 / 60 % 24);
int m  = (int)(time_spec.tv_sec / 60 % 60);
int s  = (int)(time_spec.tv_sec % 60);
int ms = (int)(time_spec.tv_nsec / 1000);
char st[50];
snprintf(st, 50, "[%02d:%02d:%02d:%06d]", h, m, s, ms);
return std::string(st);
}
void fA()
{
std::cout << get_time() << " : fA()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(40));
}
void fB()
{
std::cout << get_time() << " : fB()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}
int main(int argc, char *argv[])
{
boost::asio::io_service io;
std::shared_ptr<boost::asio::io_service::work> work = std::make_shared<boost::asio::io_service::work>(io);
std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
});
t80ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io.post(fA);
io.post(fB);
}
});
io.run();
return 0;
}

此代码的用法是:

[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : fB()
[08:15:40:623186] : t100ms

但是,我想要的结果是:

[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : t100ms
[08:15:40:604037] : fB()

t100ms可以在fA和fB之间运行,该时间更接近于从开始后100ms的正确所需时间[08:15:40:582721]。

我发现了一个Invocation示例,它给出了一个优先级队列的示例。

并尝试通过将我的代码添加到这个示例中来修改它。

...
timer.async_wait(pri_queue.wrap(42, middle_priority_handler));

std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
}));
t80ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io_service.post(pri_queue.wrap(0, fA));
io_service.post(pri_queue.wrap(0, fB));
}
}));
while (io_service.run_one())
...

但是,结果仍然没有如我所想。如下所示:

[08:30:13:868299] : start
High priority handler
Middle priority handler
Low priority handler
[08:30:13:948437] : t80ms
[08:30:13:948496] : fA()
[08:30:13:988606] : fB()
[08:30:14:008774] : t100ms

我哪里错了?

处理程序按照它们发布的顺序运行。

当80毫秒到期时,您会立即发布fA()fB()。当然,它们将首先运行,因为t100ms仍处于挂起状态。

这是你的例子,但简化了很多:

在Coliru上直播

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using boost::asio::io_context;
using boost::asio::steady_timer;
using namespace std::chrono_literals;
namespace {
static auto now = std::chrono::system_clock::now;
static auto get_time = [start = now()]{
return "at " + std::to_string((now() - start)/1ms) + "ms:t";
};
void message(std::string msg) {
std::cout << (get_time() + msg + "n") << std::flush; // minimize mixing output from threads
}
auto make_task = [](auto name, auto duration) {
return [=] {
message(name);
std::this_thread::sleep_for(duration);
};
};
}
int main() {
io_context io;
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));
});
io.run();
}

打印

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 120ms:       task B
at 140ms:       t100ms Success

一种方法

假设您确实在尝试为操作计时,请考虑运行多个线程。通过这三个字的更改,输出为:

at 1ms: start
at 81ms:    t80ms Success
at 81ms:    task A
at 82ms:    task B
at 101ms:   t100ms Success

要继续序列化A和B,请通过更改将它们发布在一条链上

post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));

auto s = make_strand(io);
post(s, make_task("task A", 40ms));
post(s, make_task("task B", 20ms));

现在打印

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 100ms:       t100ms Success
at 120ms:       task B

(完整列表如下(。

请不要线程

另一种方法是,当您不希望使用线程(例如,为了简单/安全(时,您确实需要一个队列。我会考虑简单地写出来:

struct Queue {
template <typename Ctx>
Queue(Ctx context) : strand(make_strand(context)) {}
void add(Task f) {
post(strand, [this, f=std::move(f)] {
if (tasks.empty())
run();
tasks.push_back(std::move(f));
});
}
private:
boost::asio::any_io_executor strand;
std::deque<Task> tasks;
void run() {
post(strand, [this] { drain_loop(); });
}
void drain_loop() {
if (tasks.empty()) {
message("queue empty");
} else {
tasks.front()(); // invoke task
tasks.pop_front();
run();
}
}
};

现在我们可以安全地选择是否在线程上下文中使用它,因为所有队列操作都在一个链上。

int main() {
thread_pool io; // or io_context io;
Queue tasks(io.get_executor());
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
tasks.add(make_task("task A", 40ms));
tasks.add(make_task("task B", 40ms));
});
io.join(); // or io.run()
}

使用thread_pool io;:

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 100ms:       t100ms Success
at 120ms:       task B
at 160ms:       queue empty

使用io_context io;(当然是thread_pool io(1);(:

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 120ms:       task B
at 160ms:       t100ms Success
at 160ms:       queue empty

最新更新