我正在尝试使用多线程创建一个HTTP服务器main((将client_sock从accept((移交给其中一个工作线程。如果没有可用的工作线程,它会等待,直到有一个可用。我被限制为不能在工作线程内调用accept((。这是我迄今为止代码的一部分。我有一些问题是:
- 我需要像现在这样使用2phread互斥和条件变量吗
- 在这些情况下,我需要使用pthread锁定还是完全解锁
- 如果我想在服务器上创建文件时添加一个互斥锁,我是否必须创建另一个互斥变量,或者现有的一个是否有效
#include <iostream>
#include <err.h>
#include <fcntl.h>
#include <netdb.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <getopt.h>
#include <pthread.h>
#define SIZE 1024
struct shared_data
{
int redundancy;
int client_sock;
int working_threads;
int dispatch_ready;
pthread_mutex_t* dispatch_mutex;
pthread_mutex_t* worker_mutex;
pthread_cond_t* dispatch_cond;
pthread_cond_t* worker_cond;
};
void* receiveAndSend(void* obj)
{
struct shared_data* data = (struct shared_data*) obj;
int bytes;
char buff[SIZE + 1];
while(1)
{
while(!data->dispatch_ready)
{
pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
}
data->dispatch_ready = 0;
data->working_threads++;
client_sock = data->client_sock;
bytes = recv(client_sock, buff, SIZE, 0);
// do work
data->working_threads--;
pthread_cond_signal(data->worker_cond);
}
}
int main(int argc, char* argv[])
{
if(argc < 2 || argc > 6)
{
char msg[] = "Error: invalid arg amountn";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
char* addr = NULL;
unsigned short port = 80;
int num_threads = 4;
int redundancy = 0;
char opt;
while((opt = getopt(argc, argv, "N:r")) != -1)
{
if(opt == 'N')
{
num_threads = atoi(optarg);
if(num_threads < 1)
{
char msg[] = "Error: invalid input for -N argumentn";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
}
else if(opt == 'r')
{
redundancy = 1;
}
else
{
// error (getopt automatically sends an error message)
return 1;
}
}
// non-option arguments are always the last indexes of argv, no matter how they are written in the terminal
// optind is the next index of argv after all options
if(optind < argc)
{
addr = argv[optind];
optind++;
}
if(optind < argc)
{
port = atoi(argv[optind]);
}
if(addr == NULL)
{
char msg[] = "Error: no address specifiedn";
write(STDERR_FILENO, msg, strlen(msg));
exit(1);
}
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = getaddr(addr);
serv_addr.sin_port = htons(port);
int serv_sock = socket(AF_INET, SOCK_STREAM, 0);
if(serv_sock < 0)
{
err(1, "socket()");
}
if(bind(serv_sock, (struct sockaddr*) &serv_addr, sizeof(serv_addr)) < 0)
{
err(1, "bind()");
}
if(listen(serv_sock, 500) < 0)
{
err(1, "listen()");
}
// Connecting with a client
struct sockaddr client_addr;
socklen_t client_addrlen;
pthread_mutex_t dispatch_mutex;
pthread_mutex_init(&dispatch_mutex, NULL);
pthread_mutex_t worker_mutex;
pthread_mutex_init(&worker_mutex, NULL);
pthread_cond_t dispatch_cond;
pthread_cond_init(&dispatch_cond, NULL);
pthread_cond_t worker_cond;
pthread_cond_init(&worker_cond, NULL);
struct shared_data data;
data.redundancy = redundancy;
data.dispatch_ready = 0;
data.working_threads = 0;
data.dispatch_mutex = &dispatch_mutex;
data.worker_mutex = &worker_mutex;
data.dispatch_cond = &dispatch_cond;
data.worker_cond = &worker_cond;
pthread_t* threads = new pthread_t[num_threads];
for (int i = 0; i < num_threads; i++)
{
pthread_create(&threads[i], NULL, receiveAndSend, &data);
}
while(1)
{
data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
while(data.working_threads == num_threads)
{
pthread_cond_wait(data.worker_cond, data.worker_mutex);
}
data.dispatch_ready = 1;
pthread_cond_signal(data.dispatch_cond);
}
return 0;
}
您的程序中有许多非常基本的错误,这些错误非常清楚地表明您不了解锁和条件变量(或指针的适当使用(。
锁保护某些共享数据。您只有一个共享数据项,因此您应该只需要一个锁(互斥锁(来保护它
条件变量表示某个条件为true。您的用例的合理条件是worker_available
和work_available
。(将条件变量命名为dispatch_cond
和worker_cond
无助于澄清。(
一个条件变量总是与一个互斥体相关联,但您不需要两个单独的互斥体,因为您有两个条件变量。
打开Bug。
这个代码显然有缺陷:
while(1)
{
while(!data->dispatch_ready)
{
pthread_cond_wait(data->dispatch_cond, data->dispatch_mutex);
}
来自man pthread_cond_wait
:
原子释放
mutex
并导致调用线程阻塞条件变量cond
如果这个线程从未获取互斥,它怎么能释放
此外,该线程如何在不获取互斥的情况下读取data->dispatch_ready
(与其他线程共享(?
此代码:
struct shared_data data;
data.redundancy = redundancy;
data.dispatch_ready = 0;
data.working_threads = 0;
data.dispatch_mutex = &dispatch_mutex;
data.worker_mutex = &worker_mutex;
data.dispatch_cond = &dispatch_cond;
data.worker_cond = &worker_cond;
没有bug,但有不必要的间接性。您可以使dispatch_mutex
和条件变量成为shared_data
的部分,如下所示:
struct shared_data
{
int redundancy;
int client_sock;
int working_threads;
int dispatch_ready;
pthread_mutex_t dispatch_mutex;
pthread_mutex_t worker_mutex;
pthread_cond_t dispatch_cond;
pthread_cond_t worker_cond;
};
这里是我注意到的最微妙的错误:
data.client_sock = accept(serv_sock, &client_addr, &client_addrlen);
...
data.dispatch_ready = 1;
pthread_cond_signal(data.dispatch_cond);
在这里,您将唤醒至少一个等待dispatch_cond
的线程,但可能会唤醒多个线程。如果多个线程被唤醒,那么它们都将在同一个client_sock
上继续执行recv
,这可能会带来灾难性的结果。
更新:
如何修复此问题。
可能解决这一问题的最佳和最具性能的方法是有一个";工作项目";(例如使用带有头指针和尾指针的双链表(,由锁保护。
主线程将在尾部添加元素(同时保持锁(;不为空";条件变量。
工作线程将移除head元素(当持有锁时(
工作线程将阻塞";不为空";队列为空时的条件变量。
当队列已满(所有工作线程都很忙(时,主线程可以继续添加元素,也可以阻止等待工作线程变为可用,或者它可以返回";429太多的请求";到客户端。