在多个子进程之间通过 C 中的管道传输数据(UNIX 环境)



今天我在 C 语言中遇到了一个小问题,我似乎无法弄清楚为什么我的进程在使用管道的方式上没有"排队"。

我有 2 个子进程,从父进程分叉,它们正在写入一些数据,它们应该将这些数据写入管道。 然后,父进程定期读取管道,并"清空"管道中的数据,并打印到终端行。 然后,整个事情重复自己。

//Creates Three Processes A,B,C which communicate data through a pipe.
int main(void) {
  int     fd[2], nbytes;
  pid_t   childpidB, childpidC;
  char    readbuffer[100];
  readbuffer[0] = '';
  pipe(fd);
  if((childpidB = fork()) < 0)
    {
      perror("fork B error.");
      exit(1);
    }
  if(childpidB == 0)
    {
      close(fd[0]);
      char AAA[3] = {'A','A','A'};
      int j = 0;
      int k = 1;
      while (j < 500) {
        if(j%100==0 && j!=0) {
          usleep(100);
        }
        char numbers[6];
        numbers[0] = '';
        sprintf(numbers,"%03dAAA",k);
        k++;
        j++;
        write(fd[1], numbers, (strlen(numbers)+1));
      }
      close(fd[1]);
      exit(0);
    }
  if((childpidC = fork()) < 0)
    {
      perror("fork C error.");
      exit(1);
    }
  if(childpidC == 0)
    {
      close(fd[0]);
      int i = 0;
      char c = 'A';
      int numberafter = 0;
      while (i < 260) {
        if(i%60==0 && i!=0) {
          usleep(300);
        }
      char dump[3];
      dump[0] = '';
        if(c=='[') { c='A'; numberafter++; }
        sprintf(dump,"%cx%dn",c,numberafter);
        c++;
        write(fd[1], dump, (strlen(dump)+1));    
        i++;
      }
      close(fd[1]);
      exit(0);
    }
  //Process A
  if(childpidC > 0 && childpidB > 0) {
    int x = 0;
    close(fd[1]);
    while (nbytes = read(fd[0], readbuffer, sizeof(readbuffer)) > 0) {
      if(x%50==0 && x!=0) {
    usleep(200);
      }
      printf("%sn", readbuffer);
      x++;
    }
    close(fd[0]);
    exit(0);
  }
}

我的问题是父进程并不总是从管道中读取 100 个字符,我不确定这是怎么发生的。 它只是从每个管道读取一小段数据,然后再中断。 它应该每次从管道中读取 100 个字符。 下面是输出的示例:

001AAA
Ax0
Gx0
Ox0
...(keeps going for a while)...

我想知道为什么/如何每次只从管道中读取少量数据。 它正在读取的数据实际上看起来是正确的,但它只是没有每行读取 100 个字符(因为我认为在我的代码中逻辑上应该是这样)。 然后,它还以某种方式从管道中读取任何内容,并且只写入一个空行(因为它不读取任何字符,但仍打印一个空行)。 这必然意味着子进程在每次迭代时只将几个字符写入管道,但不明白这是怎么回事。 每次运行程序时,输出也不同

似乎子进程和父进程不同步,但我认为关闭 fd[0] 或 fd[1] 的适当插槽可以解释这一点。

编辑:我已经更新了代码以更接近我想要的(或者也许正是我想要的,需要更多的测试)。 这与传递太多 NULL 字符有关。

int main(void) {
  int     fd[2], nbytes;
  pid_t   childpidB, childpidC;
  char    readbuffer[100];
  readbuffer[0] = '';
  pipe(fd);
  if((childpidB = fork()) < 0)
    {
      perror("fork A error.");
      exit(1);
    }
  if(childpidB == 0)
    {
      close(fd[0]);
      char AAA[3] = {'A','A','A'};
      int j = 0;
      int k = 1;
      while (j < 500) {
    if(j%100==0 && j!=0) {
      usleep(100);
    }
    char numbers[6];
    numbers[0] = '';
    sprintf(numbers,"%03dAAA",k);
    k++;
    j++;
    write(fd[1], numbers, (strlen(numbers)));
      }
      close(fd[1]);
      exit(0);
    }
  if((childpidC = fork()) < 0)
    {
      perror("fork B error.");
      exit(1);
    }
  if(childpidC == 0)
    {
      close(fd[0]);
      int i = 0;
      char c = 'A';
      int numberafter = 0;
      while (i < 260) {
    if(i%60==0 && i!=0) {
      usleep(300);
    }
    char dump[3];
    dump[0] = '';
    if(c=='[') { c='A'; numberafter++; }
    sprintf(dump,"%cx%d",c,numberafter);
    c++;
    write(fd[1], dump, (strlen(dump)));    
    i++;
      }
      close(fd[1]);
      exit(0);
    }
  //Process A
  int x = 0;
  close(fd[1]);
  if(childpidB > 0 && childpidC > 0) {
    while ((nbytes = read(fd[0], readbuffer, sizeof(readbuffer))) > 0) {
      if(x%50==0 && x!=0) {
    usleep(200);
      }
      printf("%sn", readbuffer);
      x++;
    }
  }
  close(fd[0]);
  exit(0);
}

问题出在您发送给父级的值中:

write(fd[1], numbers, (strlen(numbers)+1));
...
write(fd[1], dump, (strlen(dump)+1)); 

两次写入都将包含字符,因为strlen()之后的表达式中+1,因此

printf("%sn", readbuffer);

将停止从第一个字节后的readbuffer输出数据。

父级实际上确实接收 100 字节(或更少,当两个孩子都出去时)。

因此,不要通过删除+1 ;)来发送空值

编辑:

似乎childpidC会睡得更久(4 * 300 = 1200 us vs 4 * 100 = 400 us)。因此,当childpidB首先退出时,OS 会将SIGCHLD发送给父级,以通知它其中一个子项退出。在同一时刻,read()将被中断,并可能返回小于 sizeof(readbuffer) 甚至-1(错误EINTR)。

下面是从管道中不间断读取的示例函数:

ssize_t read_no_intr(int pipe_fd, void * buffer, size_t size)
{
    if (!size)
        return 0;
    ssize_t bytes_read = 0;
    ssize_t batch;
    char * p_buffer = static_cast<char*>(buffer);
    while ((size_t)bytes_read < size) {
        batch = ::read(pipe_fd, p_buffer, size - bytes_read, 0);
        if (batch == -1) {
            if(errno == EINTR) {
                // Interrupted by a signal, retry.
                continue;
            } else if (!bytes_read) {
                // An error, and no bytes were read so far.
                return -1;
            } else {
                // An error, but something was read.
                return bytes_read;
            }
        } else if(!batch) {
            // 0 means end of a stream.
            break;
        }
        bytes_read += batch;
        p_buffer += batch;
    }
    return bytes_read;
}

最新更新