并发作业调度和处理分配问题



我正在上操作系统理论课,任务之一是:由于交互式作业调度程序main((、jobDispatch((和runJob((同时运行,因此存在并发问题,这将导致该程序偶尔失败(取决于运行机器的CPU/Core数量(。

修改此程序以确保它将在任何CPU/Core配置上运行。提示:您需要确定这3个函数使用的公共资源。然后使用C++的互斥特性来保护资源。

#include <iostream>
#include <thread>
#include <fstream>
#include <string>
#include <vector>
using namespace std;
static const int numJobsMax = 3;  // maximum jobs allowed to run simulataneously
int numJobsRunning = 0;  // check # of jobs currently running
bool programEndFlag = false;  // send program end signal to all threads
int lastJobID = 1;  // unique job ID starts with 1
enum status { jobReadyToRun, jobRunning, jobToBeStopped };  // job running status
// job structure to keep job information
struct job {
string jobName;  // name of the job
int jobID;  // unique # for job ID
status jobStatus = jobReadyToRun;  // initialize status
int jobProcessTime = 15;  // default process time = 10 sec
};
vector<job*> jobQueue;  // system queue for user created jobs
/* function prototyping */
void jobDispatch();  // dispath job from the job queue in the background (thread)
void runJob(job*);  // run job as a thread
void batchJobs();  // run a predefined batch job queue
int main() {
int choice;
job* jobPtr;
/* run Job Dispatcher in the background as a thread */
std::thread t = std::thread(jobDispatch);
t.detach();
/* Allow user to schedule or delete jobs */
programEndFlag = false;
do {
cout << "nJob Scheduling Options:n";
cout << "1: Start a new jobn";
cout << "2: Display job queuen";
cout << "99: Exitn";
cout << "Please enter your choice (#): ";
cin >> choice;
switch (choice) {
case 1:
jobPtr = new job;
cout << "Enter Job Name: ";
cin >> jobPtr->jobName;
jobPtr->jobID = lastJobID++;
jobPtr->jobStatus = jobReadyToRun;  // set status to ReadyToRun for jobDispatch()
jobQueue.push_back(jobPtr);  // push new job to job queue
break;
case 2:
cout << "nNumber of Jobs: " << jobQueue.size() << endl;
for (unsigned int i = 0; i < jobQueue.size(); i++) {
jobPtr = jobQueue[i];
cout << "Name=" << jobPtr->jobName << " ";
cout << "ID=" << jobPtr->jobID << " ";
if (jobPtr->jobStatus == jobRunning) cout << "Status=Running." << endl;
if (jobPtr->jobStatus == jobReadyToRun) cout << "Status=ReadyToRun." << endl;
if (jobPtr->jobStatus == jobToBeStopped) cout << "Status=ToBeStopped." << endl;
}
break;
}
} while (choice != 99);
std::cout << "nMain Program endedn";
programEndFlag = true;
system("pause");
// remove job files before exit program
system("del Job*");
system("del stop_Job*");
return 0;
}
// dispath job from the job queue in the background (thread)
void jobDispatch() {
int activeJobs;

// this thread loops forever until error occurs or program ends
while (true) {
if (programEndFlag) break;  // End this thread when user exit program
// Validate # of running jobs are less than maximum allowed
activeJobs = 0;
for (unsigned int i = 0; i < jobQueue.size(); i++)
if (jobQueue[i]->jobStatus == jobRunning)
activeJobs++;
if (activeJobs > numJobsMax) {
cout << "Error: more jobs running than maximum allowed" << endl;
return;
}
// Run jobs with status ReadyToRun
for (unsigned int i = 0; i < jobQueue.size(); i++) {
if (activeJobs < numJobsMax && jobQueue[i]->jobStatus == jobReadyToRun) {
activeJobs++;
std::thread t = std::thread(runJob, jobQueue[i]);
t.detach();
jobQueue[i]->jobStatus = jobRunning;
}
}
// Terminate jobs with status jobToBeStopped
for (unsigned int i = 0; i < jobQueue.size(); i++) {
if (jobQueue[i]->jobStatus == jobToBeStopped) {
activeJobs--;
string jobFile, cmdLine;
jobFile = "Job" + to_string(jobQueue[i]->jobID) + ".txt";
cmdLine = "ren " + jobFile + " " + "stop_" + jobFile;
system((cmdLine).c_str());
jobQueue.erase(jobQueue.begin() + i);
}
}
}
}
// run job as a thread
void runJob(job* jobPtr) {
string jobFile, cmdLine;
jobFile = "Job" + to_string(jobPtr->jobID) + ".txt";
ofstream outFile(jobFile);
outFile << "Job" << jobPtr->jobID << " - " << jobPtr->jobName << " is running ..." << endl;
this_thread::sleep_for(std::chrono::seconds(jobPtr->jobProcessTime));  // doing something
outFile << "Job completed." << endl;
outFile.close();
jobPtr->jobStatus = jobToBeStopped;  // set job status so it will be removed from queue
}

教授所说的常见资源是作业队列(这是他在讲座中所说的(。起初我认为它就像在void jobDispatch((中添加一个.lock和.ulock(我们有一些作业就像添加两行代码一样简单,所以我认为他可能只是想再次欺骗我们(一样简单,但这给了我错误E0300一个指向绑定函数的指针只能用于调用函数这是我在调用#include和初始化互斥锁Locker时添加的内容;

void jobDispatch() {
int activeJobs;
Locker.lock;
// this thread loops forever until error occurs or program ends
while (true) {
if (programEndFlag) break;  // End this thread when user exit program
// Validate # of running jobs are less than maximum allowed
activeJobs = 0;
for (unsigned int i = 0; i < jobQueue.size(); i++)
if (jobQueue[i]->jobStatus == jobRunning)
activeJobs++;
if (activeJobs > numJobsMax) {
cout << "Error: more jobs running than maximum allowed" << endl;
return;
}
// Run jobs with status ReadyToRun
for (unsigned int i = 0; i < jobQueue.size(); i++) {
if (activeJobs < numJobsMax && jobQueue[i]->jobStatus == jobReadyToRun) {
activeJobs++;
std::thread t = std::thread(runJob, jobQueue[i]);
t.detach();
jobQueue[i]->jobStatus = jobRunning;
}
}
// Terminate jobs with status jobToBeStopped
for (unsigned int i = 0; i < jobQueue.size(); i++) {
if (jobQueue[i]->jobStatus == jobToBeStopped) {
activeJobs--;
string jobFile, cmdLine;
jobFile = "Job" + to_string(jobQueue[i]->jobID) + ".txt";
cmdLine = "ren " + jobFile + " " + "stop_" + jobFile;
system((cmdLine).c_str());
jobQueue.erase(jobQueue.begin() + i);
}
}
Locker.unlock;
//this_thread::sleep_for(std::chrono::seconds(5));  // DELAY 5 sec to AVOID crash (a temporary fix given to us to see exactly how the program works but we are unable to use as our solution)
}
}
}

我认为唯一有帮助的解决方案是https://en.cppreference.com/w/cpp/thread/mutex/unlock我不知道如何把它落实到这项任务中。我可能想得太多了,我的大脑在这一点上完全被炸了,但我已经像以前一样使用了基本的.lock-and-unlock,效果很好。任何帮助/提示/建议/提示都将不胜感激。

给我错误E0300一个指向绑定函数的指针只能用于调用函数

()在函数调用中丢失。应该是Locker.lock()Locker.unlock()

我还建议重新考虑锁的位置。

只有一个线程在运行函数jobDispatcher(),它总是单线程的。你可能会这样想,如果jobQueue是公共资源,有多个线程同时访问和修改jobQueue,那么代码的哪一部分会引起并发问题?锁定所需的最小代码可以提高效率。

最新更新