从这个问题中我知道我可以在另一个线程阻塞epoll_wait(2)
时调用epoll_ctl(2)
。不过我还有一个问题。
当使用带有EPOLLONESHOT
标志的epoll
时,只会触发一个事件,并且必须使用epoll_ctl(2)
重新武装fd。这是必需的,所以只有一个线程将从 FD 读取并适当地处理结果。
以下是我假设的问题的时间表:
Thread1: Thread2: Kernel:
-----------------------------------------------------------------------
epoll_wait();
Receives chunk
dispatch chunk to thread 2
epoll_wait(); Handle chunk
Still handle chunk Receives chunk
Rearm fd for epoll
?
当收到块后重新武装 fd 时,问号上会发生什么?epoll
会触发EPOLLIN
事件,还是会无限期地阻止,尽管套接字是可读的?我的架构是否合理?
您的体系结构是明智的,并且可以正常工作:epoll
会将文件描述符标记为可读并触发EPOLLIN
事件。
这方面的文档很少而且很微妙;man 7 epoll
的问答部分简要提到了这一点:
您可以对现有文件描述符Q8 对文件描述符的操作是否会影响已收集的 但尚未报告事件?
A8 您可以对一个 现有文件描述符。在这种情况下,删除将毫无意义。 修改将重新读取可用的 I/O。
(现有文件描述符是过去已添加到 epoll 集中的文件描述符 - 这包括等待重新修复的文件描述符(执行的两个操作是删除和修改。正如手册页所提到的,删除在这里毫无意义,修改将重新评估文件描述符中的条件。
不过,没有什么比现实世界的实验更胜一筹了。以下程序测试此边缘情况:
#include <stdio.h>
#include <pthread.h>
#include <signal.h>
#include <stdlib.h>
#include <assert.h>
#include <semaphore.h>
#include <sys/epoll.h>
#include <unistd.h>
static pthread_t tids[2];
static int epoll_fd;
static char input_buff[512];
static sem_t chunks_sem;
void *dispatcher(void *arg) {
struct epoll_event epevent;
while (1) {
printf("Dispatcher waiting for more chunksn");
if (epoll_wait(epoll_fd, &epevent, 1, -1) < 0) {
perror("epoll_wait(2) error");
exit(EXIT_FAILURE);
}
ssize_t n;
if ((n = read(STDIN_FILENO, input_buff, sizeof(input_buff)-1)) <= 0) {
if (n < 0)
perror("read(2) error");
else
fprintf(stderr, "stdin closed prematurelyn");
exit(EXIT_FAILURE);
}
input_buff[n] = ' ';
sem_post(&chunks_sem);
}
return NULL;
}
void *consumer(void *arg) {
sigset_t smask;
sigemptyset(&smask);
sigaddset(&smask, SIGUSR1);
while (1) {
sem_wait(&chunks_sem);
printf("Consumer received chunk: %s", input_buff);
/* Simulate some processing... */
sleep(2);
printf("Consumer finished processing chunk.n");
printf("Please send SIGUSR1 after sending more data to stdinn");
int signo;
if (sigwait(&smask, &signo) < 0) {
perror("sigwait(3) error");
exit(EXIT_FAILURE);
}
assert(signo == SIGUSR1);
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLONESHOT;
epevent.data.fd = STDIN_FILENO;
if (epoll_ctl(epoll_fd, EPOLL_CTL_MOD, STDIN_FILENO, &epevent) < 0) {
perror("epoll_ctl(2) error when attempting to readd stdin");
exit(EXIT_FAILURE);
}
printf("Readded stdin to epoll fdn");
}
}
int main(void) {
sigset_t sigmask;
sigfillset(&sigmask);
if (pthread_sigmask(SIG_SETMASK, &sigmask, NULL) < 0) {
perror("pthread_sigmask(3) error");
exit(EXIT_FAILURE);
}
if ((epoll_fd = epoll_create(1)) < 0) {
perror("epoll_create(2) error");
exit(EXIT_FAILURE);
}
struct epoll_event epevent;
epevent.events = EPOLLIN | EPOLLONESHOT;
epevent.data.fd = STDIN_FILENO;
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, STDIN_FILENO, &epevent) < 0) {
perror("epoll_ctl(2) error");
exit(EXIT_FAILURE);
}
if (sem_init(&chunks_sem, 0, 0) < 0) {
perror("sem_init(3) error");
exit(EXIT_FAILURE);
}
if (pthread_create(&tids[0], NULL, dispatcher, NULL) < 0) {
perror("pthread_create(3) error on dispatcher");
exit(EXIT_FAILURE);
}
if (pthread_create(&tids[1], NULL, consumer, NULL) < 0) {
perror("pthread_create(3) error on consumer");
exit(EXIT_FAILURE);
}
size_t i;
for (i = 0; i < sizeof(tids)/sizeof(tids[0]); i++) {
if (pthread_join(tids[i], NULL) < 0) {
perror("pthread_join(3) error");
exit(EXIT_FAILURE);
}
}
return 0;
}
它的工作原理如下:调度程序线程将stdin
添加到 epoll 集,然后在 stdin
变得可读时使用 epoll_wait(2)
从输入获取输入。当输入到达时,调度程序唤醒工作线程,该线程打印输入并通过休眠 2 秒来模拟一些处理时间。与此同时,调度程序返回到主循环并再次阻塞epoll_wait(2)
。
工作线程不会重置stdin
,直到您通过将其发送到SIGUSR1
来告诉它。所以,我们只是在stdin
中写入更多的东西,然后将SIGUSR1
发送到流程中。工作线程收到信号,然后它才重新武装stdin
- 那时已经可读,调度程序已经在等待epoll_wait(2)
您可以从输出中看到调度程序已正确唤醒,并且一切都像魅力一样工作:
Dispatcher waiting for more chunks
testing 1 2 3 // Input
Dispatcher waiting for more chunks // Dispatcher notified worker and is waiting again
Consumer received chunk: testing 1 2 3
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin
hello world // Input
Readded stdin to epoll fd // Rearm stdin; dispatcher is already waiting
Dispatcher waiting for more chunks // Dispatcher saw new input and is now waiting again
Consumer received chunk: hello world
Consumer finished processing chunk.
Please send SIGUSR1 after sending more data to stdin