使用c++ 20协程在线程之间切换



有一个使用c++ 20协程切换到另一个线程的例子:

#include <coroutine>
#include <iostream>
#include <stdexcept>
#include <thread>
auto switch_to_new_thread(std::jthread& out) {
struct awaitable {
std::jthread* p_out;
bool await_ready() { return false; }
void await_suspend(std::coroutine_handle<> h) {
std::jthread& out = *p_out;
if (out.joinable())
throw std::runtime_error("Output jthread parameter not empty");
out = std::jthread([h] { h.resume(); });
// Potential undefined behavior: accessing potentially destroyed *this
// std::cout << "New thread ID: " << p_out->get_id() << 'n';
std::cout << "New thread ID: " << out.get_id() << 'n'; // this is OK
}
void await_resume() {}
};
return awaitable{ &out };
}
struct task {
struct promise_type {
task get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << 'n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << 'n';
}
int main() {
std::jthread out;
resuming_on_new_thread(out);
}

协程从主线程开始,切换到新创建的线程。

让它切换回主线程的正确方法是什么?

下面的代码

task resuming_on_new_thread(std::jthread& out) {
std::cout << "Coroutine started on thread: " << std::this_thread::get_id() << 'n';
co_await switch_to_new_thread(out);
// awaiter destroyed here
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << 'n';
co_await switch_to_main_thread();
std::cout << "Coroutine resumed on thread: " << std::this_thread::get_id() << 'n';
}

将打印

Coroutine started on thread: 139972277602112
New thread ID: 139972267284224
Coroutine resumed on thread: 139972267284224
Coroutine resumed on thread: 139972277602112

switch_to_new_thread实际上创建了一个新线程,它没有切换到一个新线程。然后注入代码,恢复其中的协程。

要在特定线程上运行代码,必须在该线程上实际运行代码。要恢复协程,该特定线程必须运行恢复该协程的代码。

在这里,您通过创建一个全新的线程并注入执行resume的代码来实现。


做这类事情的传统方法是使用消息泵。您想要参与的线程具有消息泵和事件队列。它按顺序运行事件。

为了使一个特定的线程运行一些代码,你向事件队列发送一个消息,其中包含指令(可能是实际的代码,也可能只是一个值)。

为此,这样一个"事件消费线程";大于一个std::jthreadstd::thread;这是一个线程安全队列,线程中的一些人从队列中弹出任务并执行它们。

在这样的系统中,你可以通过发送消息在线程之间移动。

所以你有一个队列:

template<class T>
struct threadsafe_queue {
[[nodiscard]] std::optional<T> pop();
[[nodiscard]] std::deque<T> pop_many(std::optional<std::size_t> count = {}); // defaults to all
[[nodiscard]] bool push(T);
template<class C, class D>
[[nodiscard]] std::optional<T> wait_until_pop(std::chrono::time_point<C,D>);
void abort();
[[nodiscard]] bool is_aborted() const { return aborted; }
private:
mutable std::mutex m;
std::condition_variable cv;
std::deque<T> queue;
bool aborted = false;
auto lock() const { return std::unique_lock(m); }
};

任务:

using task_queue = threadsafe_queue<std::function<void()>>;

一个基本的消息泵是:

void message_pump( task_queue& q ) {
while (auto f = q.pop()) {
if (*f) (*f)();
}
}

然后创建两个task_queue,一个用于主线程,一个用于工作线程。要切换到worker而不是创建一个新的jthread,您需要:

workerq.push( [&]{ h.resume(); } );

,类似地切换到主

mainq.push( [&]{ h.resume(); } );

有很多细节我已经跳过了,但这是你如何做的一个草图。

实现这一点的一种方法是有一个线程安全的队列,协程将自己置于该队列中,以告诉主线程"请立即恢复我"。此时,您基本上构建了一个线程池。main函数必须监视该队列(定期轮询它或等待将某些东西放入其中),然后在可用时获取并执行元素(工作项)。

最新更新