我有一个程序有两个函数。一个是循环计时器,另一个是接收一些套接字。
我发现,如果在定时器长篇大论之前有多个包进入,那么boost将运行所有套接字句柄,然后运行定时器句柄。
我写了一个简单的代码来模拟这个时间,如下所示:
#include <iostream>
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>
std::string get_time()
{
struct timespec time_spec;
clock_gettime(CLOCK_REALTIME, &time_spec);
int h = (int)(time_spec.tv_sec / 60 / 60 % 24);
int m = (int)(time_spec.tv_sec / 60 % 60);
int s = (int)(time_spec.tv_sec % 60);
int ms = (int)(time_spec.tv_nsec / 1000);
char st[50];
snprintf(st, 50, "[%02d:%02d:%02d:%06d]", h, m, s, ms);
return std::string(st);
}
void fA()
{
std::cout << get_time() << " : fA()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(40));
}
void fB()
{
std::cout << get_time() << " : fB()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}
int main(int argc, char *argv[])
{
boost::asio::io_service io;
std::shared_ptr<boost::asio::io_service::work> work = std::make_shared<boost::asio::io_service::work>(io);
std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
});
t80ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io.post(fA);
io.post(fB);
}
});
io.run();
return 0;
}
此代码的用法是:
[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : fB()
[08:15:40:623186] : t100ms
但是,我想要的结果是:
[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : t100ms
[08:15:40:604037] : fB()
t100ms可以在fA和fB之间运行,该时间更接近于从开始后100ms的正确所需时间[08:15:40:582721
]。
我发现了一个Invocation示例,它给出了一个优先级队列的示例。
并尝试通过将我的代码添加到这个示例中来修改它。
...
timer.async_wait(pri_queue.wrap(42, middle_priority_handler));
std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
}));
t80ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io_service.post(pri_queue.wrap(0, fA));
io_service.post(pri_queue.wrap(0, fB));
}
}));
while (io_service.run_one())
...
但是,结果仍然没有如我所想。如下所示:
[08:30:13:868299] : start
High priority handler
Middle priority handler
Low priority handler
[08:30:13:948437] : t80ms
[08:30:13:948496] : fA()
[08:30:13:988606] : fB()
[08:30:14:008774] : t100ms
我哪里错了?
处理程序按照它们发布的顺序运行。
当80毫秒到期时,您会立即发布fA()
和fB()
。当然,它们将首先运行,因为t100ms仍处于挂起状态。
这是你的例子,但简化了很多:
在Coliru上直播
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using boost::asio::io_context;
using boost::asio::steady_timer;
using namespace std::chrono_literals;
namespace {
static auto now = std::chrono::system_clock::now;
static auto get_time = [start = now()]{
return "at " + std::to_string((now() - start)/1ms) + "ms:t";
};
void message(std::string msg) {
std::cout << (get_time() + msg + "n") << std::flush; // minimize mixing output from threads
}
auto make_task = [](auto name, auto duration) {
return [=] {
message(name);
std::this_thread::sleep_for(duration);
};
};
}
int main() {
io_context io;
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));
});
io.run();
}
打印
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 120ms: task B
at 140ms: t100ms Success
一种方法
假设您确实在尝试为操作计时,请考虑运行多个线程。通过这三个字的更改,输出为:
at 1ms: start
at 81ms: t80ms Success
at 81ms: task A
at 82ms: task B
at 101ms: t100ms Success
要继续序列化A和B,请通过更改将它们发布在一条链上
post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));
至
auto s = make_strand(io);
post(s, make_task("task A", 40ms));
post(s, make_task("task B", 20ms));
现在打印
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 100ms: t100ms Success
at 120ms: task B
(完整列表如下(。
请不要线程
另一种方法是,当您不希望使用线程(例如,为了简单/安全(时,您确实需要一个队列。我会考虑简单地写出来:
struct Queue {
template <typename Ctx>
Queue(Ctx context) : strand(make_strand(context)) {}
void add(Task f) {
post(strand, [this, f=std::move(f)] {
if (tasks.empty())
run();
tasks.push_back(std::move(f));
});
}
private:
boost::asio::any_io_executor strand;
std::deque<Task> tasks;
void run() {
post(strand, [this] { drain_loop(); });
}
void drain_loop() {
if (tasks.empty()) {
message("queue empty");
} else {
tasks.front()(); // invoke task
tasks.pop_front();
run();
}
}
};
现在我们可以安全地选择是否在线程上下文中使用它,因为所有队列操作都在一个链上。
int main() {
thread_pool io; // or io_context io;
Queue tasks(io.get_executor());
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
tasks.add(make_task("task A", 40ms));
tasks.add(make_task("task B", 40ms));
});
io.join(); // or io.run()
}
使用thread_pool io;
:
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 100ms: t100ms Success
at 120ms: task B
at 160ms: queue empty
使用io_context io;
(当然是thread_pool io(1);
(:
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 120ms: task B
at 160ms: t100ms Success
at 160ms: queue empty