如何在C++中创建高效的多线程任务计划程序



我想在C++创建一个非常高效的任务调度系统。

基本思想是这样的:

class Task {
public:
virtual void run() = 0;
};
class Scheduler {
public:
void add(Task &task, double delayToRun);
};

Scheduler后面,应该有一个固定大小的线程池,用于运行任务(我不想为每个任务创建一个线程)。delayToRun意味着task不会立即执行,而是在delayToRun秒后执行(从添加到Scheduler中开始测量)。

(当然,delayToRun意味着"至少"值。如果系统已加载,或者如果我们从调度程序询问不可能的事情,它将无法处理我们的请求。但它应该尽力而为)

这是我的问题。如何有效地实现delayToRun功能?我正在尝试通过使用互斥体和条件变量来解决这个问题。

我看到两种方式:

使用管理器线程

调度程序包含两个队列:allTasksQueuetasksReadyToRunQueue。任务在Scheduler::add时添加到allTasksQueue中。有一个管理器线程,它等待的时间最短,因此它可以将任务从allTasksQueue放到tasksReadyToRunQueue.工作线程等待tasksReadyToRunQueue中可用的任务。

如果Scheduler::addallTasksQueue前面添加一个任务(一个任务,其值为delayToRun,因此它应该在当前最快运行的任务之前),则需要唤醒管理器任务,以便它可以更新等待时间。

这种方法可以被认为是低效的,因为它需要两个队列,并且需要两个 condvar.signal 来使任务运行(一个用于allTasksQueue->tasksReadyToRunQueue,另一个用于向工作线程发出实际运行任务的信号)

无管理器线程

调度程序中有一个队列。任务将在Scheduler::add添加到此队列中。工作线程检查队列。如果为空,则在没有时间限制的情况下等待。如果它不为空,则等待最快的任务。

  1. 如果只有一个条件变量需要工作线程等待:则可以认为此方法效率低下,因为如果在队列前面添加了任务(front 意味着,如果有 N 个工作线程,则任务索引<N),则需要唤醒>所有工作线程以更新它们正在等待的时间。

  2. 如果每个线程都有一个单独的条件变量,那么我们可以控制唤醒哪个线程,所以在这种情况下我们不需要唤醒所有线程(我们只需要唤醒等待时间最长的线程,所以我们需要管理这个值)。我目前正在考虑实现这一点,但制定确切的细节很复杂。对这种方法有什么建议/想法/文件吗?


这个问题有更好的解决方案吗?我正在尝试使用标准的C++功能,但我愿意使用依赖于平台(我的主要平台是 linux)的工具(如 pthreads),甚至是特定于 Linux 的工具(如 futexes),如果它们提供更好的解决方案。

通过使用一种设计,即单个池线程在一个条件变量上等待"下一个运行"任务(如果有),而其余池线程无限期地等待第二个条件变量,可以避免具有单独的"管理器"线程,以及在下一个要运行的任务更改时必须唤醒大量任务。

池线程将按照以下行执行伪代码:

pthread_mutex_lock(&queue_lock);
while (running)
{
if (head task is ready to run)
{
dequeue head task;
if (task_thread == 1)
pthread_cond_signal(&task_cv);
else
pthread_cond_signal(&queue_cv);
pthread_mutex_unlock(&queue_lock);
run dequeued task;
pthread_mutex_lock(&queue_lock);
}
else if (!queue_empty && task_thread == 0)
{
task_thread = 1;
pthread_cond_timedwait(&task_cv, &queue_lock, time head task is ready to run);
task_thread = 0;
}
else
{
pthread_cond_wait(&queue_cv, &queue_lock);
}
}
pthread_mutex_unlock(&queue_lock);

如果将下一个任务更改为运行,则执行:

if (task_thread == 1)
pthread_cond_signal(&task_cv);
else
pthread_cond_signal(&queue_cv);

queue_lock举行。

在此方案下,所有唤醒都直接位于单个线程上,只有一个任务优先级队列,并且不需要管理器线程。

你的规范有点太强了:

delayToRun意味着任务不会立即执行,而是在delayToRun秒后执行

您忘了添加"至少":

  • 任务现在不会执行,但至少delayToRun秒后执行

关键是,如果一万个任务都以0.1延迟ToRun进行调度,那么它们实际上肯定无法同时运行。

通过这样的更正,您只需维护一些队列(或议程)(计划的开始时间,要运行的关闭),保持该队列的排序,然后开始N(一些固定数量)线程,这些线程原子地弹出议程的第一个元素并运行它。

然后需要唤醒所有工作线程以更新它们正在等待的时间。

否,某些工作线程将被唤醒。

阅读有关条件变量和广播的信息。

您也可以使用 POSIX 定时器(参见 timer_create(2),或 Linux 特定的 fd 定时器,参见 timerfd_create(2)

你可能会避免在你的线程中运行阻塞系统调用,并让一些中心线程使用一些事件循环来管理它们(参见 poll(2)...);否则,如果你有一百个任务sleep(100)运行,一个任务计划在半秒内运行,它不会在一百秒之前运行。

您可能想阅读有关延续传递样式编程的信息(它 -CPS- 非常相关)。阅读Juliusz Chroboczek关于Continuation Pass C的论文。

还要研究Qt线程。

你也可以考虑用 Go 编码(及其 Goroutines)。

这是您提供的接口的示例实现,最接近您的">使用管理器线程"描述。

它使用单个线程(timer_thread)来管理队列(allTasksQueue),该队列根据必须启动任务的实际时间(std::chrono::time_point)进行排序。
"队列"是一个std::priority_queue(它保持其time_point关键元素的排序)。

timer_thread通常挂起,直到启动下一个任务或添加新任务。
当一个任务即将运行时,它被放置在tasksReadyToRunQueue中,其中一个工作线程发出信号,唤醒,将其从队列中删除并开始处理任务。

请注意,线程池具有线程数的编译时上限 (40)。如果您计划的任务多于可以分派给工作人员的任务, 新任务将阻塞,直到线程再次可用。

你说这种方法效率不高,但总的来说,在我看来它似乎相当有效。这一切都是事件驱动的,您不会因不必要的旋转而浪费 CPU 周期。 当然,这只是一个例子,优化是可能的(注意:std::multimap已被替换为std::priority_queue)。

实现符合 C++11 标准

#include <iostream>
#include <chrono>
#include <queue>
#include <unistd.h>
#include <vector>
#include <thread>
#include <condition_variable>
#include <mutex>
#include <memory>
class Task {
public:
virtual void run() = 0;
virtual ~Task() { }
};
class Scheduler {
public:
Scheduler();
~Scheduler();
void add(Task &task, double delayToRun);
private:
using timepoint = std::chrono::time_point<std::chrono::steady_clock>;
struct key {
timepoint tp;
Task *taskp;
};
struct TScomp {
bool operator()(const key &a, const key &b) const
{
return a.tp > b.tp;
}
};
const int ThreadPoolSize = 40;
std::vector<std::thread> ThreadPool;
std::vector<Task *> tasksReadyToRunQueue;
std::priority_queue<key, std::vector<key>, TScomp> allTasksQueue;
std::thread TimerThr;
std::mutex TimerMtx, WorkerMtx;
std::condition_variable TimerCV, WorkerCV;
bool WorkerIsRunning = true;
bool TimerIsRunning = true;
void worker_thread();
void timer_thread();
};
Scheduler::Scheduler()
{
for (int i = 0; i <ThreadPoolSize; ++i)
ThreadPool.push_back(std::thread(&Scheduler::worker_thread, this));
TimerThr = std::thread(&Scheduler::timer_thread, this);
}
Scheduler::~Scheduler()
{
{
std::lock_guard<std::mutex> lck{TimerMtx};
TimerIsRunning = false;
TimerCV.notify_one();
}
TimerThr.join();
{
std::lock_guard<std::mutex> lck{WorkerMtx};
WorkerIsRunning = false;
WorkerCV.notify_all();
}
for (auto &t : ThreadPool)
t.join();
}
void Scheduler::add(Task &task, double delayToRun)
{
auto now = std::chrono::steady_clock::now();
long delay_ms = delayToRun * 1000;
std::chrono::milliseconds duration (delay_ms);
timepoint tp = now + duration;
if (now >= tp)
{
/*
* This is a short-cut
* When time is due, the task is directly dispatched to the workers
*/
std::lock_guard<std::mutex> lck{WorkerMtx};
tasksReadyToRunQueue.push_back(&task);
WorkerCV.notify_one();
} else
{
std::lock_guard<std::mutex> lck{TimerMtx};
allTasksQueue.push({tp, &task});
TimerCV.notify_one();
}
}
void Scheduler::worker_thread()
{
for (;;)
{
std::unique_lock<std::mutex> lck{WorkerMtx};
WorkerCV.wait(lck, [this] { return tasksReadyToRunQueue.size() != 0 ||
!WorkerIsRunning; } );
if (!WorkerIsRunning)
break;
Task *p = tasksReadyToRunQueue.back();
tasksReadyToRunQueue.pop_back();
lck.unlock();
p->run();
delete p; // delete Task
}
}
void Scheduler::timer_thread()
{
for (;;)
{
std::unique_lock<std::mutex> lck{TimerMtx};
if (!TimerIsRunning)
break;
auto duration = std::chrono::nanoseconds(1000000000);
if (allTasksQueue.size() != 0)
{
auto now = std::chrono::steady_clock::now();
auto head = allTasksQueue.top();
Task *p = head.taskp;
duration = head.tp - now;
if (now >= head.tp)
{
/*
* A Task is due, pass to worker threads
*/
std::unique_lock<std::mutex> ulck{WorkerMtx};
tasksReadyToRunQueue.push_back(p);
WorkerCV.notify_one();
ulck.unlock();
allTasksQueue.pop();
}
}
TimerCV.wait_for(lck, duration);
}
}
/*
* End sample implementation
*/

class DemoTask : public Task {
int n;
public:
DemoTask(int n=0) : n{n} { }
void run() override
{
std::cout << "Start task " << n << std::endl;;
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << " Stop task " << n << std::endl;;
}
};
int main()
{
Scheduler sched;
Task *t0 = new DemoTask{0};
Task *t1 = new DemoTask{1};
Task *t2 = new DemoTask{2};
Task *t3 = new DemoTask{3};
Task *t4 = new DemoTask{4};
Task *t5 = new DemoTask{5};
sched.add(*t0, 7.313);
sched.add(*t1, 2.213);
sched.add(*t2, 0.713);
sched.add(*t3, 1.243);
sched.add(*t4, 0.913);
sched.add(*t5, 3.313);
std::this_thread::sleep_for(std::chrono::seconds(10));
}

这意味着您希望使用某种顺序连续运行所有任务。

您可以创建某种类型的按延迟堆栈(甚至链表)排序的任务。当新任务即将到来时,您应该根据延迟时间将其插入该位置(只需有效地计算该位置并有效地插入新任务)。

从任务堆栈(或列表)的头部开始运行所有任务。

C++11 的核心代码:

#include <thread>
#include <queue>
#include <chrono>
#include <mutex>
#include <atomic>
using namespace std::chrono;
using namespace std;
class Task {
public:
virtual void run() = 0;
};
template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>>
class SchedulerItem {
public:
T task;
time_point<steady_clock> startTime;
int delay;
SchedulerItem(T t, time_point<steady_clock> s, int d) : task(t), startTime(s), delay(d){}
};
template<typename T, typename = enable_if<std::is_base_of<Task, T>::value>>
class Scheduler {
public:
queue<SchedulerItem<T>> pool;
mutex mtx;
atomic<bool> running;
Scheduler() : running(false){}
void add(T task, double delayMsToRun) {
lock_guard<mutex> lock(mtx);
pool.push(SchedulerItem<T>(task, high_resolution_clock::now(), delayMsToRun));
if (running == false) runNext();
}
void runNext(void) {
running = true;
auto th = [this]() {
mtx.lock();
auto item = pool.front();
pool.pop();
mtx.unlock();
auto remaining = (item.startTime + milliseconds(item.delay)) - high_resolution_clock::now();
if(remaining.count() > 0) this_thread::sleep_for(remaining);
item.task.run();
if(pool.size() > 0) 
runNext();
else
running = false;
};
thread t(th);
t.detach();
}
};

测试代码:

class MyTask : Task {
public:
virtual void run() override {
printf("mytask n");
};
};
int main()
{
Scheduler<MyTask> s;
s.add(MyTask(), 0);
s.add(MyTask(), 2000);
s.add(MyTask(), 2500);
s.add(MyTask(), 6000);
std::this_thread::sleep_for(std::chrono::seconds(10));
}

最新更新