c-多线程客户端-服务器同步问题



我正在尝试构建多线程客户端-服务器项目,其中:每个客户端将名称和姓氏插入两个队列中,一个用于名称,另一个用于姓氏。插入后,我创建了N个工作线程,以便在两个队列之间找到匹配项。如果找到匹配项(name==surface(,线程必须删除两个队列中匹配的节点。

我正在考虑两种可能的同步场景:

  1. 一个线程正在工作,N-1被挂起,以便在删除匹配的节点后更新队列。但我认为它效率低下,因为我没有并行线程执行。

  2. 2个线程只更新队列,N-2只同时读取队列,我只锁定正在读取或删除的节点。

下面是第二个场景的执行,但我对嵌套锁感到困惑。

下面的代码适用于服务器。客户端只在队列中插入名称。

updateOne线程正在等待,直到条件变量update变为1。如果为1,则删除匹配的节点,并将条件变量update设置为0,然后释放func线程。

对于所需的同步场景,我想要两件事:

  1. func线程连续运行,直到find3匹配为止。如果一个节点被删除,它们必须读取队列的其余部分,而不是挂起

问题1:**如何让其余的func线程工作而不挂起?**2( updateOne线程可以获取读取器锁,以便遍历队列并删除匹配的节点,而func线程读取的其余节点

问题2如何使用readerLock在节点级别而不是队列级别进行锁定

问题3pop_from_index是否正常工作


#include "list.h"

typedef struct info {
node *info_buy;
node *info_sell;
}info;
int update = 0;
int matches = 0;
pthread_mutex_t reader_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t update_lock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t condition = PTHREAD_COND_INITIALIZER;
void *updateOne (void *args){
info *arg = (info *)args;
node *temp_buy = arg->info_buy;
node *temp_sell = arg->info_sell;
printf("waitingn");
while (1){
pthread_mutex_lock(&update_lock);
while (update != 1){
pthread_cond_wait(&condition, &update_lock);
}
printf("woke upn");
node *tb = temp_buy;
node *ts = temp_sell;

int index_to_delete = -1;
while (tb != NULL){
if (tb->valid == 0){
index_to_delete = tb->n;
printf("Index buy %dn", tb->n);
pop_from_index(&temp_buy, index_to_delete);
}
tb = tb->next;
}
printList(temp_buy);
while (ts != NULL){
if (ts->valid == 0){
index_to_delete = ts->n;
printf("Index sell %dn", ts->n);
pop_from_index(&temp_sell, index_to_delete);
}
ts = ts->next;
}
printList(temp_sell);
update = 0;
pthread_cond_broadcast(&condition);
pthread_mutex_unlock(&update_lock);
}
return NULL;
}
void *func(void *args)
{
// Store the value argument passed to this thread
/*info *n = (info*) args;
pthread_mutex_lock(&reader_lock);
int index = -1;
node *t1 = n->info_buy;
while (t1 != NULL){
node *t2 = n->info_sell;
while (t2 != NULL){
if (strcmp(t1->command, t2->command) == 0){
pthread_mutex_lock(&update_lock);
update = 1;
pthread_cond_broadcast(&condition);
t1->valid = 1;
t2->valid = 1;
pthread_mutex_unlock(&update_lock);
break;
}
t2 = t2->next;
}
t1 = t1->next;
}
pthread_mutex_unlock(&reader_lock);
*/

info *queues = (info *) args;
node *buy = queues->info_buy;
node *sell = queues->info_sell;

while (1){
pthread_mutex_lock(&reader_lock);
node *tb = buy;
while (tb != NULL){
node *ts = sell;
while (ts != NULL){
pthread_mutex_lock(&update_lock);
sleep(1);
while (update == 1){
pthread_cond_wait(&condition, &update_lock);
}
if (strcmp(ts->command, tb->command) == 0){
matches++;
if (matches >= 3){
break;
}
printf("*** Matched! ***n");
printf("Matched pair --> (%s, %s)n", ts->command, tb->command);
ts->valid = 0;
tb->valid = 0;
update = 1;
pthread_cond_broadcast(&condition);
}
ts = ts->next;
pthread_mutex_unlock(&update_lock);
}
tb = tb->next;
}
if (matches >= 3){
break;
}
pthread_mutex_unlock(&reader_lock);
}
return NULL;
}
void *destroy_list(void *args){
pthread_mutex_lock(&update_lock);
info *queues = (info *)args;
destroyList(&(queues->info_buy));
destroyList(&(queues->info_sell));
pthread_mutex_unlock(&update_lock);
return NULL;
}
int main(){
pthread_t workingThreads[5];
pthread_t updateThreads[2];
pthread_t destroy[2];
pthread_cond_init(&condition, NULL);
if (pthread_mutex_init(&reader_lock, NULL) != 0) {
printf("n mutex init has failedn");
return 1;
}
if (pthread_mutex_init(&update_lock, NULL) != 0) {
printf("n mutex init has failedn");
return 1;
}
node *sell = NULL;
node *buy = NULL;
push(&sell, "Maria");
push(&sell, "Sam");
push(&sell, "Tom");
push(&sell, "Takis");
push(&buy, "Mary");
push(&buy, "Edgar");
push(&buy, "Marx");
push(&buy, "Takis");
//
//
info *ptr = malloc(sizeof(info));
ptr->info_buy = sell;
ptr->info_sell = buy;
for (int i = 0; i < 1; i++){
pthread_create(&(updateThreads[i]), NULL, updateOne, ptr);
}
pthread_create(&(workingThreads[0]), NULL, func, ptr);
pthread_create(&(workingThreads[1]), NULL, func, ptr);
pthread_create(&(workingThreads[2]), NULL, func, ptr);
pthread_create(&(workingThreads[3]), NULL, func, ptr);
pthread_create(&(workingThreads[4]), NULL, func, ptr);
//    for (int i = 0; i < 2; i++){
//        pthread_create(&(destroy[i]), NULL, destroy_list, ptr);
//    }

for (int i = 0; i < 2; i++){
pthread_join(updateThreads[i], NULL);
}
for (int i = 0; i < 5; i++){
pthread_join(workingThreads[i], NULL);
}

pthread_mutex_destroy(&reader_lock);
pthread_mutex_destroy(&update_lock);
pthread_cond_destroy(&condition);
destroyList(&sell);
destroyList(&buy);
return 0;
}
void pop_from_index (node **head, int index){


if ((*head) == NULL){
return;
}
node * temp = (*head);
if (index == 0){
*head = temp->next;
free(temp);
return;
}
for (int i=0; temp!=NULL && i< index - 1; i++){
temp = temp->next;
}

// If position is more than number of nodes
if (temp == NULL || temp->next == NULL)
return;
// Node temp->next is the node to be deleted
// Store pointer to the next of node to be deleted
struct node *next = temp->next->next;
free(temp->next);  // Free memory
temp->next = next;  // Unlink the deleted node from list

我认为您正在寻找一个互斥对象。它非常复杂,但简单地说,它是一种锁定/解锁机制,适用于多个线程试图访问相同数据的情况,或者换句话说:线程之间的同步。我想这篇文章会让你对这个话题有一个很好的了解。

相关内容

  • 没有找到相关文章