有一个使用c++ 20协程切换到另一个线程的例子:
#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); });
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << 'n';
std::cout << "New thread ID: " << out.get_id() << 'n'; // this is OK
}
void await_resume() {}
};
return awaitable{ &out };
}
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << 'n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << 'n';
}
int main() {
std::jthread out;
resuming_on_new_thread(out);
}
协程从主线程开始,切换到新创建的线程。
让它切换回主线程的正确方法是什么?
下面的代码
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << 'n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << 'n';
co_await switch_to_main_thread();
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << 'n';
}
将打印
Coroutine started on thread: 139972277602112
New thread ID: 139972267284224
Coroutine resumed on thread: 139972267284224
Coroutine resumed on thread: 139972277602112
switch_to_new_thread
实际上创建了一个新线程,它没有切换到一个新线程。然后注入代码,恢复其中的协程。
要在特定线程上运行代码,必须在该线程上实际运行代码。要恢复协程,该特定线程必须运行恢复该协程的代码。
在这里,您通过创建一个全新的线程并注入执行resume
的代码来实现。
做这类事情的传统方法是使用消息泵。您想要参与的线程具有消息泵和事件队列。它按顺序运行事件。
为了使一个特定的线程运行一些代码,你向事件队列发送一个消息,其中包含指令(可能是实际的代码,也可能只是一个值)。
为此,这样一个"事件消费线程";大于一个std::jthread
或std::thread
;这是一个线程安全队列,线程中的一些人从队列中弹出任务并执行它们。
在这样的系统中,你可以通过发送消息在线程之间移动。
所以你有一个队列:
template<class T>
struct threadsafe_queue {
[[nodiscard]] std::optional<T> pop();
[[nodiscard]] std::deque<T> pop_many(std::optional<std::size_t> count = {}); // defaults to all
[[nodiscard]] bool push(T);
template<class C, class D>
[[nodiscard]] std::optional<T> wait_until_pop(std::chrono::time_point<C,D>);
void abort();
[[nodiscard]] bool is_aborted() const { return aborted; }
private:
mutable std::mutex m;
std::condition_variable cv;
std::deque<T> queue;
bool aborted = false;
auto lock() const { return std::unique_lock(m); }
};
任务:
using task_queue = threadsafe_queue<std::function<void()>>;
一个基本的消息泵是:
void message_pump( task_queue& q ) {
while (auto f = q.pop()) {
if (*f) (*f)();
}
}
然后创建两个task_queue,一个用于主线程,一个用于工作线程。要切换到worker而不是创建一个新的jthread
,您需要:
workerq.push( [&]{ h.resume(); } );
,类似地切换到主
mainq.push( [&]{ h.resume(); } );
有很多细节我已经跳过了,但这是你如何做的一个草图。
实现这一点的一种方法是有一个线程安全的队列,协程将自己置于该队列中,以告诉主线程"请立即恢复我"。此时,您基本上构建了一个线程池。main函数必须监视该队列(定期轮询它或等待将某些东西放入其中),然后在可用时获取并执行元素(工作项)。