c语言 - 我应该在这里使用哪种同步原语?


while(1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
puts((char *) message);
return NULL;
}

上面的例子不是 MRE,但它非常简单:

我有一个带有循环的主线程,该循环不断消耗来自消息队列的消息。收到新消息后,该消息将存储在本地message_buffer缓冲区中。然后,一个新的线程被生成来"处理"所说的新消息,因此消息缓冲区的地址被传递到handle_message,新线程随后执行。


问题所在

通常,2 个线程将打印相同的消息,即使我可以 100% 确定队列中的消息不同。


我不完全确定,但我想我明白为什么会发生这种情况:

假设我将 2 条不同的消息推送到 mqueue,然后我才开始使用它们。

while循环的第一次迭代中,消息将从队列中使用并保存到message_buffer。将生成一个新线程,并将message_length的地址传递给它。但是,该线程可能不够快,无法在下一条消息被使用之前(在循环的下一次迭代中)缓冲区的内容打印到流中,并且随后覆盖message_buffer的内容。因此,第一个和第二个线程现在打印相同的值。


我的问题是:解决这个问题的最有效方法是什么?我对并行编程和线程/pthreads很陌生,我对不同的同步原语感到不知所措。

互斥故障

static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
while(1) {
char message_buffer[SIZE];
pthread_mutex_lock(&m);
ssize_t message_length = mq_receive(mq_identifier, message_buffer, _mqueue_max_msg_size NULL);
pthred_mutex_unlock(&m);
if(message_len == -1) { /* error handling... */}
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if(ret) { /* error handling ... */}
}
void * handle_message (void * message) {
char own_buffer[SIZE];
pthread_mutex_lock(&m);
strncpy(own_buffer, (char *) message, SIZE);
pthread_mutex_unlock(&m);
puts(own_buffer);
return NULL;
}

我认为我当前的互斥实现不正确,因为线程仍在接收重复的消息。主线程可以锁定互斥锁,将消息消耗到缓冲区中,解锁互斥锁,生成线程,但该线程仍然可能挂起,主线程可以再次重写缓冲区(因为缓冲区互斥锁从未被新线程锁定),有效地使我当前的互斥锁实现无用?我该如何克服这个问题?

问题是在保证线程已完成该内存之前,您结束了包含message_buffer的循环。

while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
/****** Can't go beyond here until thread is done with message_buffer. ******/
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
/******* Only now can the caller loop back. ******/
puts(own_buffer);
return NULL;
}

您可以使用信号量或类似工具。

static pthread_mutex_t mutex  = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t  cond   = PTHREAD_COND_INITIALIZER;
static int             copied = 0;
while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, message_buffer);
if (ret) { /* error handling */ }
// Wait until threads is done with message_buffer.
pthread_mutex_lock(&mutex);
while (!copied) pthread_cond_wait(&cond, &mutex);
copied = 0;
pthread_mutex_unlock(&mutex);
}
void * handle_message (void * message) {
char own_buffer[SIZE];
strncpy(own_buffer, (char *) message, SIZE);
// Done with caller's buffer.
// Signal caller to continue.
pthread_mutex_lock(&mutex);
copied = 1;
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
puts(own_buffer);
return NULL;
}

(添加的区块可有效地执行信号量操作。有关更通用的实现,请参阅此答案的最后一个片段。

但是有一个更简单的解决方案:在创建线程之前制作副本。

while (1) {
char message_buffer[SIZE];
ssize_t message_length = mq_receive(...);
if (message_len == -1) { /* error handling */ }
pthread_t pt1;
int ret = pthread_create(&pt1, NULL, handle_message, strdup(message_buffer));
if (ret) { /* error handling */ }
}
void * handle_message (void * message) {
char * own_buffer = message;
puts(own_buffer);
free(own_buffer);
return NULL;
}

最新更新