我在boost::fiber上遇到了问题。我的代码基于 boost::fiber 的"work_stealing.cpp"示例。我装饰了一下。它现在可以在Windows子系统Linux Ubuntu上运行,用于调试和发布版本。事实上,直到昨晚,它都可以在Windows Visual Studio构建上工作。但是今天,我们尝试运行一些测试,在调试构建中引发的 BOOST ASSERT 故障。发布版本可以工作...
我不知道为什么。。。那么,有人知道这件事吗?为什么它只在Windows调试版本上?我做错了什么?
我正在使用cmake作为构建工具,Visual Studio 2019社区版作为开发工具。我还在WSL Ubuntu 20.04和macOS 10.15.x上进行测试(不记得了...(。
谢谢。
-泉
从以下提升代码引发的故障:
// <boost>/lib/fiber/src/scheduler.cpp
...
void
scheduler::detach_worker_context( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( ! ctx->ready_is_linked() );
#if ! defined(BOOST_FIBERS_NO_ATOMICS)
BOOST_ASSERT( ! ctx->remote_ready_is_linked() );
#endif
BOOST_ASSERT( ! ctx->sleep_is_linked() );
BOOST_ASSERT( ! ctx->terminated_is_linked() );
BOOST_ASSERT( ! ctx->wait_is_linked() ); // <-- [THE ERROR RAISED FROM HERE!]
BOOST_ASSERT( ctx->worker_is_linked() );
BOOST_ASSERT( ! ctx->is_context( type::pinned_context) );
ctx->worker_unlink();
BOOST_ASSERT( ! ctx->worker_is_linked() );
ctx->scheduler_ = nullptr;
// a detached context must not belong to any queue
}
...
我的代码如下所示(我删除了一些不相关的部分......
//coroutine.h
class CoroutineManager {
...
typedef std::unique_lock<std::mutex> lock_type;
typedef boost::fibers::algo::work_stealing scheduler_algorithm;
typedef boost::fibers::default_stack stack_type;
...
void wait() {
lock_type shutdown_lock(shutdown_mutex);
shutdown_condition.wait(shutdown_lock, [this]() { return 1 == this->shutdown_flag; });
BOOST_ASSERT(1 == this->shutdown_flag);
}
void shutdown(bool wait = true) {
lock_type shutdown_lock(shutdown_mutex);
this->shutdown_flag = 1;
shutdown_lock.unlock();
this->shutdown_condition.notify_all();
if (wait) {
for (std::thread& t : threads) {
if (t.joinable()) {
t.join();
}
}
}
}
...
template <class TaskType>
void submit(const TaskType& task) {
boost::fibers::fiber(boost::fibers::launch::dispatch, std::allocator_arg, stack_type(this->stack_size), task).detach();
}
...
void init() {
if (verbose) spdlog::info("INIT THREAD({0}) STARTED.", std::this_thread::get_id());
for (int i = 0; i < (this->thread_count - 1); ++i) {
threads.push_back(std::thread(&CoroutineManager::thread, this));
}
boost::fibers::use_scheduling_algorithm<scheduler_algorithm>(this->thread_count);
this->start_barrier.wait();
}
void dispose() {
if (verbose) spdlog::info("INIT THREAD({0}) DISPOSED.", std::this_thread::get_id());
}
void thread() {
if(verbose) spdlog::info("WORKER THREAD({0}) STARTED.", std::this_thread::get_id());
boost::fibers::use_scheduling_algorithm<scheduler_algorithm>(this->thread_count);
this->start_barrier.wait();
this->wait();
if (verbose) spdlog::info("WORKER THREAD({0}) DISPOSED.", std::this_thread::get_id());
}
...
};
- - - - - - - - - - - - - - - - - - - - - - -
// coroutine_test.cpp
...
int main(int argc, char* argv[]) {
init_file_logger("coroutine_test.log");
time_tracker tracker("[coroutine_test.main]");
typedef std::thread::id tid_t;
typedef boost::fibers::fiber::id fid_t;
typedef boost::fibers::buffered_channel<std::tuple<tid_t, fid_t, int>> channel_t;
CoroutineManager manager(std::thread::hardware_concurrency(), 1024 * 1024 * 8, true);
const int N = 1;
channel_t chan { 8 };
boost::fibers::barrier done_flag(2);
manager.submit([&chan, &done_flag]() {
std::tuple<tid_t, fid_t, int> p;
while( boost::fibers::channel_op_status::success == chan.pop_wait_for(p, std::chrono::milliseconds(100))) {
spdlog::info("[thead({0}) : fiber({1}) from {2}]", std::get<0>(p), std::get<1>(p), std::get<2>(p));
}
done_flag.wait();
});
for (int i = 0; i < N; ++i) {
manager.submit([&chan, i]() {
for (int j = 0; j < 1000; ++j) {
chan.push(std::move(std::make_tuple(std::this_thread::get_id(), boost::this_fiber::get_id(), i)));
boost::this_fiber::yield();
}
});
}
done_flag.wait();
spdlog::info("-START TO SHUTDOWN-");
manager.shutdown();
spdlog::info("-END-");
return 0;
}
[更新] 添加快照以更清楚地解释我的情况...
我想我找到了它是如何发生的(虽然不是根本原因(:
失败的断言发生在具有推送/pop_wait_for对的缓冲通道中。如果您更改为使用 try_push/pop 对,则异常将消失。
我认为它是pop_wait_for,但由于堆栈信息非常模糊,我无法弄清楚。我也不是助推专家。希望其他人能成功。
所以,我现在的代码是这样的:
manager.submit([&chan, &done_flag]() {
std::tuple<tid_t, fid_t, int> p;
for (;;) {
boost::fibers::channel_op_status status = chan.pop(p);
if (boost::fibers::channel_op_status::success == status) {
spdlog::info("[thead({0}) : fiber({1}) from {2}]", std::get<0>(p), std::get<1>(p), std::get<2>(p));
} else if (boost::fibers::channel_op_status::closed == status) {
spdlog::info("[ off loop ]");
break;
}
boost::this_fiber::yield();
}
});
for (int i = 0; i < N; ++i) {
manager.submit([&chan, i]() {
for (int j = 0; j < 1000; ++j) {
boost::fibers::channel_op_status status = chan.try_push(std::make_tuple(std::this_thread::get_id(), boost::this_fiber::get_id(), i));
spdlog::warn("[ worker ] status : {0}", (int)status);
}
});
}
就像:
manager.submit([&chan, &done_flag]() {
std::tuple<tid_t, fid_t, int> p;
while( boost::fibers::channel_op_status::success == chan.pop_wait_for(p, std::chrono::milliseconds(100))) {
spdlog::info("[thead({0}) : fiber({1}) from {2}]", std::get<0>(p), std::get<1>(p), std::get<2>(p));
}
});
for (int i = 0; i < N; ++i) {
manager.submit([&chan, i]() {
for (int j = 0; j < 1000; ++j) {
chan.push(std::make_tuple(std::this_thread::get_id(), boost::this_fiber::get_id(), i));
}
});
}
希望这可以帮助遇到同样情况的其他人。