我有一个多读程序,它的 2 个线程通过消息队列相互通信。第一个线程(发送方)定期发送消息,而第二个线程(接收方)处理信息。
发件人的代码类似于以下内容:
// Create queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664 | IPC_CREAT);
// Create message and send
struct request_msg req_msg;
req_msg.mtype = 1;
snprintf(req_msg.mtext, MSG_LENGTH, "Send this information");
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);
在接收线程上,我这样做:
// Subscribe to queue
key_t key = ftok("/tmp", 'B');
int msqid = msgget(key, 0664);
struct request_msg req_msg;
while(running)
{
msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
// Do sth with the message
}
如您所见,接收器位于由名为"running"的全局变量控制的while循环中。如果在进程中遇到错误,错误处理程序会将布尔值设置为 false。这在大多数情况下都有效,但如果在能够将消息发送到队列之前发生错误,接收方将不会退出 while 循环,因为它在继续之前等待消息,从而检查运行变量。这意味着它将永远挂在那里,因为发送方在运行时的其余部分不会发送任何内容。
我想避免这种情况,但我不知道如何让 msgrcv 知道它不能期待更多的消息。如果我杀死队列,我无法了解 msgrcv 的行为方式,假设这是最简单的版本。也许超时或发送某种终止消息(可能使用消息结构的 mtype 成员)也是可能的。
请让我知道这个问题最可靠的解决方案是什么。谢谢!
编辑:根据建议,我重新设计了代码以使信号处理程序操作原子化。
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0
struct message
{
uint64_t iteration;
char req_time[28];
};
static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;
pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;
static void
termination_handler(int signum)
{
running = false;
}
static void
alarm_handler(int signum)
{
work = true;
}
static void
write_msg(void)
{
// Reset the alarm interval
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
raise(SIGTERM);
return;
}
struct timeval current_time;
gettimeofday(¤t_time, NULL);
printf("nLoop count: %lun", loop_count);
printf("Loop time: %f usn", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
(current_time.tv_usec - previous_time.tv_usec));
previous_time = current_time;
// format timeval struct
char tmbuf[64];
time_t nowtime = current_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
// write values
pthread_mutex_lock(&mutexmsg);
msg.iteration = loop_count;
snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
loop_count++;
}
static void*
process_msg(void *args)
{
while(1)
{
pthread_mutex_lock(&mutexmsg);
printf("Waiting for conditionn");
pthread_cond_wait(&data_updated_cv, &mutexmsg);
printf("Condition fulfilledn");
if(!running)
{
break;
}
struct timeval process_time;
gettimeofday(&process_time, NULL);
char tmbuf[64];
char buf[64];
time_t nowtime = process_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
// something that takes longer than the interval time
// sleep(1);
printf("[%s] Req time: %s loop cnt: %lun", buf, msg.req_time, msg.iteration);
pthread_mutex_unlock(&mutexmsg);
}
pthread_exit(NULL);
}
int
main(int argc, char* argv[])
{
pthread_t thread_id;
pthread_attr_t attr;
// for portability, set thread explicitly as joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
{
perror("pthread_create");
exit(1);
}
pthread_attr_destroy(&attr);
// signal handling setup
struct sigaction t;
t.sa_handler = termination_handler;
sigemptyset(&t.sa_mask);
t.sa_flags = 0;
sigaction(SIGINT, &t, NULL);
sigaction(SIGTERM, &t, NULL);
struct sigaction a;
a.sa_handler = alarm_handler;
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
sigaction(SIGALRM, &a, NULL);
// Set the alarm interval
alarm_interval.it_interval.tv_sec = 0;
alarm_interval.it_interval.tv_usec = 0;
alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
gettimeofday(&previous_time, NULL);
while(1)
{
// suspending main thread until a signal is caught
pause();
if(!running)
{
// signal the worker thread to stop execution
pthread_mutex_lock(&mutexmsg);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
break;
}
if(work)
{
write_msg();
work = false;
}
}
// suspend thread until the worker thread joins back in
pthread_join(thread_id, NULL);
// reset the timer
alarm_interval.it_value.tv_sec = 0;
alarm_interval.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
printf("EXITn");
pthread_exit(NULL);
}
除了作为同步原语之外,您没有证明使用消息队列的合理性。您可以通过变量和原子标志传递消息,以指示消息就绪情况。然后,此答案描述了如何使用条件变量实现线程挂起和恢复。这就是通常在线程之间完成的方式,尽管当然不是唯一的方法。
我不知道如何让 msgrcv 知道它不能期待更多的消息
没这个必要。只需发送一条消息,告诉线程完成!running
变量不属于:您正在尝试与另一个线程进行通信,因此请按照您选择的方式进行操作:发送消息!
我花了最后一天阅读了很多关于线程和互斥锁的信息,并试图让我的示例程序工作。确实如此,但不幸的是,当我尝试通过 Ctrl+C 关闭它时,它卡住了。 原因是(再次)这一次,工作线程等待来自不再发送信号的主线程的信号。
@Rachid K.和@Unslander Monica:如果你想再看一遍,这是不是更先进的代码?另外,我认为我必须使用pthread_cond_timedwait
而不是pthread_cond_wait
来避免终止死锁。你能告诉我如何处理吗?
请注意,程序只是定期(间隔 1 秒)将时间戳和循环计数器交给打印出数据的处理线程。输出还显示调用打印的时间。
再次感谢!
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0
static bool running = true;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
struct message
{
uint64_t iteration;
char req_time[28];
} msg;
pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;
static void
signal_handler(int signum)
{
if (signum == SIGINT || signum == SIGTERM)
{
running = false;
}
}
static void
write_msg(int signum)
{
if(!running)
{
return;
}
// Reset the alarm interval
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
raise(SIGTERM);
return;
}
struct timeval current_time;
gettimeofday(¤t_time, NULL);
printf("nLoop count: %lun", loop_count);
printf("Loop time: %f usn", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
(current_time.tv_usec - previous_time.tv_usec));
previous_time = current_time;
// format timeval struct
char tmbuf[64];
time_t nowtime = current_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
// write values
pthread_mutex_lock(&mutexmsg);
msg.iteration = loop_count;
snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
loop_count++;
}
static void*
process_msg(void *args)
{
while(running)
{
pthread_mutex_lock(&mutexmsg);
printf("Waiting for conditionn");
pthread_cond_wait(&data_updated_cv, &mutexmsg);
printf("Condition fulfilledn");
struct timeval process_time;
gettimeofday(&process_time, NULL);
char tmbuf[64];
char buf[64];
time_t nowtime = process_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
printf("[%s] Message req time: %s loop cnt: %lun", buf, msg.req_time, msg.iteration);
pthread_mutex_unlock(&mutexmsg);
}
pthread_exit(NULL);
}
int
main(int argc, char* argv[])
{
pthread_t thread_id;
pthread_attr_t attr;
// for portability, set thread explicitly as joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
{
perror("pthread_create");
exit(1);
}
pthread_attr_destroy(&attr);
// signal handling setup
struct sigaction s;
s.sa_handler = signal_handler;
sigemptyset(&s.sa_mask);
s.sa_flags = 0;
sigaction(SIGINT, &s, NULL);
sigaction(SIGTERM, &s, NULL);
struct sigaction a;
a.sa_handler = write_msg;
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
sigaction(SIGALRM, &a, NULL);
// Set the alarm interval
alarm_interval.it_interval.tv_sec = 0;
alarm_interval.it_interval.tv_usec = 0;
alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
gettimeofday(&previous_time, NULL);
// suspend thread until the worker thread joins back in
pthread_join(thread_id, NULL);
// reset the timer
alarm_interval.it_value.tv_sec = 0;
alarm_interval.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
pthread_exit(NULL);
return 0;
}
在接收线程上,我这样做:
... while(running) { msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
希望实际上你做的远不止这些。
因为您没有检查已发布的代码中的任何错误状态。 对于可能指定为在收到信号时永远不会重新启动的阻塞函数调用来说,这是完全错误的(在 Linux 和 Solaris 上也是如此)。 根据 Linux 'signal(2):
以下接口在中断后永远不会重新启动 通过信号处理程序,无论使用
SA_RESTART
;他们 总是失败,错误EINTR
被信号中断时 处理器:
- 。
- 系统 V IPC 接口:
msgrcv(2)
、msgsnd(2)
、semop(2)
和semtimedop(2)
.
和 Solarissigaction()
:
SA_RESTART
如果设置并捕获信号,则由系统透明地重新启动因执行此信号处理程序而中断的功能,即
fcntl(2)
、ioctl(2)
、wait(3C)
、waitid(2)
,以及终端等慢速设备上的以下功能:getmsg()
和getpmsg()
(见getmsg(2)
);putmsg()
和putpmsg()
(见putmsg(2)
);pread()
、read()
和readv()
(见read(2)
);pwrite()
、write()
和writev()
(见write(2)
);recv()
、recvfrom()
和recvmsg()
(见recv(3SOCKET)
);以及send()
、sendto()
和sendmsg()
(见send(3SOCKET)
)。否则,该函数将返回EINTR
错误。
因此,您的代码需要看起来更像这样,以便处理错误和信号中断:
volatile sig_atomic_t running;
...
while(running)
{
errno = 0;
ssize_t result = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, 0);
if ( result == ( ssize_t ) -1 )
{
// if the call failed or no longer running
// break the loop
if ( ( errno != EINTR ) || !running )
{
break;
}
// the call was interrupted by a signal
continue
}
...
}
这提供了使用alarm()
和SIGALRM
信号处理程序将running
设置为0
以用作超时的机会:
volatile sig_atomic_t running;
void handler( int sig );
{
running = 0;
}
...
struct sigaction sa;
memset( &sa, 0, sizeof( sa ) );
sa.sa_handler = handler;
sigaction( SIGALRM, &sa, NULL );
while(running)
{
// 10-sec timeout
alarm( 10 );
errno = 0;
ssize_t result = msgrcv( msqid, &req_msg, sizeof(req_msg.mtext), 0, 0 );
// save errno as alarm() can munge it
int saved_errno = errno;
// clear alarm if it hasn't fired yet
alarm( 0 );
if ( result == ( ssize_t ) -1 )
{
// if the call failed or no longer running
// break the loop
if ( ( saved_errno != EINTR ) || !running )
{
break;
}
// the call was interrupted by a signal
continue
}
...
}
这几乎可以肯定是可以改进的 - 捕获所有极端情况的逻辑相当复杂,并且可能有一种更简单的方法可以做到这一点。
对问题中新提案的回答
循环- 计时器应在主线程的事件循环中重新设置,以获得更好的可见性(主观建议);
- 当辅助线程脱离其循环时,它必须释放 互斥锁,否则主线程将进入死锁(等待 对于被终止的辅助线程锁定的互斥锁)。
因此,这是具有上述修复/增强功能的最后一个建议:
#include <stdbool.h> // bool data type
#include <stdio.h>
#include <signal.h>
#include <stdint.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#define ALARM_INTERVAL_SEC 1
#define ALARM_INTERVAL_USEC 0
struct message
{
uint64_t iteration;
char req_time[28];
};
static volatile bool running = true;
static volatile bool work = false;
static struct itimerval alarm_interval;
static struct timeval previous_time;
static uint64_t loop_count = 0;
static struct message msg;
pthread_mutex_t mutexmsg;
pthread_cond_t data_updated_cv;
static void
termination_handler(int signum)
{
running = false;
}
static void
alarm_handler(int signum)
{
work = true;
}
static void
write_msg(void)
{
struct timeval current_time;
gettimeofday(¤t_time, NULL);
printf("nLoop count: %lun", loop_count);
printf("Loop time: %f usn", (current_time.tv_sec - previous_time.tv_sec) * 1e6 +
(current_time.tv_usec - previous_time.tv_usec));
previous_time = current_time;
// format timeval struct
char tmbuf[64];
time_t nowtime = current_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
// write values
pthread_mutex_lock(&mutexmsg);
msg.iteration = loop_count;
snprintf(msg.req_time, sizeof(msg.req_time), "%s.%06ld", tmbuf, current_time.tv_usec);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
loop_count++;
}
static void*
process_msg(void *args)
{
while(1)
{
pthread_mutex_lock(&mutexmsg);
printf("Waiting for conditionn");
pthread_cond_wait(&data_updated_cv, &mutexmsg);
printf("Condition fulfilledn");
if(!running)
{
pthread_mutex_unlock(&mutexmsg); // <----- To avoid deadlock
break;
}
struct timeval process_time;
gettimeofday(&process_time, NULL);
char tmbuf[64];
char buf[64];
time_t nowtime = process_time.tv_sec;
struct tm *nowtm = localtime(&nowtime);
strftime(tmbuf, sizeof(tmbuf), "%Y-%m-%d %H:%M:%S", nowtm);
snprintf(buf, sizeof(buf), "%s.%06ld", tmbuf, process_time.tv_usec);
// something that takes longer than the interval time
//sleep(2);
printf("[%s] Req time: %s loop cnt: %lun", buf, msg.req_time, msg.iteration);
pthread_mutex_unlock(&mutexmsg);
}
printf("Thread exiting...n");
pthread_exit(NULL);
}
int
main(int argc, char* argv[])
{
pthread_t thread_id;
pthread_attr_t attr;
// for portability, set thread explicitly as joinable
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
if(pthread_create(&thread_id, NULL, process_msg, NULL) != 0)
{
perror("pthread_create");
exit(1);
}
pthread_attr_destroy(&attr);
// signal handling setup
struct sigaction t;
t.sa_handler = termination_handler;
sigemptyset(&t.sa_mask);
t.sa_flags = 0;
sigaction(SIGINT, &t, NULL);
sigaction(SIGTERM, &t, NULL);
struct sigaction a;
a.sa_handler = alarm_handler;
sigemptyset(&a.sa_mask);
a.sa_flags = 0;
sigaction(SIGALRM, &a, NULL);
// Set the alarm interval
alarm_interval.it_interval.tv_sec = 0;
alarm_interval.it_interval.tv_usec = 0;
alarm_interval.it_value.tv_sec = ALARM_INTERVAL_SEC;
alarm_interval.it_value.tv_usec = ALARM_INTERVAL_USEC;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
gettimeofday(&previous_time, NULL);
while(1)
{
// Reset the alarm interval <-------- Rearm the timer in the main loop
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
raise(SIGTERM);
break;
}
// suspending main thread until a signal is caught
pause();
if(!running)
{
// signal the worker thread to stop execution
pthread_mutex_lock(&mutexmsg);
pthread_cond_signal(&data_updated_cv);
pthread_mutex_unlock(&mutexmsg);
break;
}
if(work)
{
write_msg();
work = false;
}
}
// suspend thread until the worker thread joins back in
pthread_join(thread_id, NULL);
// reset the timer
alarm_interval.it_value.tv_sec = 0;
alarm_interval.it_value.tv_usec = 0;
if(setitimer(ITIMER_REAL, &alarm_interval, NULL) < 0)
{
perror("setitimer");
exit(1);
}
printf("EXITn");
pthread_exit(NULL);
}
====
======================================================================对原始问题的回答
可以使用条件变量来等待来自发送方的信号。这使接收方唤醒并通过在msgrcv()的 flags 参数中传递IPC_NOWAIT来检查消息队列中的消息。要结束通信,可以发布"通信结束"消息。也可以使用 pthread_con_timedwait() 定期唤醒并检查"通信结束"或"接收器结束条件"(例如,通过检查全局"正在运行"变量)。
接收器侧:
// Mutex initialization
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// Condition variable initialization
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
[...]
while (1) {
// Lock the mutex
pthread_mutex_lock(&mutex);
// Check for messages (non blocking thanks to IPC_NOWAIT)
rc = msgrcv(msqid, &req_msg, sizeof(req_msg.mtext), 0, IPC_NOWAIT);
if (rc == -1) {
if (errno == ENOMSG) {
// message queue empty
// Wait for message notification
pthread_cond_wait(&cond, &mutex); // <--- pthread_cond_timedwait() can be used to wake up and check for the end of communication or senders...
} else {
// Error
}
}
// Handle the message, end of communication (e.g. "running" variable)...
// Release the lock (so that the sender can post something in the queue)
pthread_mutex_unlock(&mutex);
}
发送方:
// Prepare the message
[...]
// Take the lock
pthread_mutex_lock(&mutex);
// Send the message
msgsnd(msqid, &req_msg, strlen(req_msg.mtext) + 1, 0);
// Wake up the receiver
pthread_cond_signal(&cond);
// Release the lock
pthread_mutex_unlock(&mutex);
注意:SYSV 消息队列已过时。最好使用全新的Posix服务。