构造多线程的关键部分



我有一个生产者线程,它从源文件中读取每个字符及其偏移量,然后将其写入共享循环缓冲区。

我还有一个消费者线程,它读取缓冲区中最老的元素,然后将其写入副本文件。

生产者和消费者线程的数量作为命令行参数给出,而且每次生产者/消费者线程读取或写入缓冲区,或读取或写入文件时,将特定行写入日志文件。

现在我对生产者和消费者线程有一个单一的临界区。我该如何构建它,以减少在临界部分花费的时间(它变得越来越慢)。我正在考虑为我的生产者和消费者线程设置多个关键部分。例如,在我的生产者线程中,我可以有一个临界区用于从源文件读取并将特定行(例如"从文件读取")写入日志文件,另一个临界区用于向缓冲区写入并将特定行(例如"向缓冲区写入")写入日志文件。但是如果临界区一个接一个,这又有什么区别呢?

这是我的制作人线程:

void *INthread(void *arg)
{
    printf("INSIDE INthreadn");
    FILE *srcFile = (FILE*)arg;
    FILE *lp; // Log file pointers.
    int t_id = INid++; // Thread number.
    int curOffset;
    BufferItem result;
    struct timespec t;
    t.tv_sec = 0;
    t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
    nanosleep(&t, NULL);
    fseek(srcFile, 0, SEEK_CUR);
    curOffset = ftell(srcFile); // Save the current byte offset.
    fseek(srcFile, 0, SEEK_END);
    if(len == 0) // If len hasnt been set by the first IN thread yet.
        len = ftell(srcFile); // Save the length of the srcFile (number of chars).
    fseek(srcFile, curOffset, SEEK_SET); // Revert file pointer to curOffset.
    printf("ID %d number of bytes %dn", t_id, len);
    int offs;
    int ch;
    while(len > 0) // Go through each byte/char in file.
    {
        /*** CRITICAL SECTION ********************************/
        sem_wait(&empty); /* acquire the empty lock */
        pthread_mutex_lock( &pt_mutex );
        if(len > 0){
            fseek(srcFile, 0, SEEK_CUR);
            if((offs = ftell(srcFile)) != -1)
                result.offset = offs;     /* get position of byte in file */
            if((ch = fgetc(srcFile)) != EOF)
                result.data = ch;       /* read byte from file */
            // Write to log file "read_byte PTn Ox Bb I-1".
            if (!(lp = fopen(log, "a"))) {
                printf("could not open log file for writing");
            }
            if(fprintf(lp, "read_byte PT%d O%d B%d I-1n", t_id, offs, ch) < 0){
                printf("could not write to log file");
            }
            printf("ID %d --- offset %d char %c len%dn", t_id, result.offset, result.data, len);
            addItem(&cBuff, &result);
            // Write to log file "produce PTn Ox Bb Ii  ".
            if(fprintf(lp, "produce PT%d O%d B%d I%dn", t_id, offs, ch, cBuff.lastInd) < 0){
                printf("could not write to log file");
            }
            fclose(lp);
            len--;
        }
        pthread_mutex_unlock( &pt_mutex );
        sem_post(&full); /* signal full */
        /*** END CRITICAL SECTION ********************************/
        t.tv_sec = 0;
        t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
        nanosleep(&t, NULL);
    }
    inJoin[t_id] = 1; // This IN thread is ready to be joined.
    printf("EXIT INthreadn");
    pthread_exit(0);
}
这是我的消费者线程:
void *OUTthread(void *arg)
{
    printf("INSIDE OUTthreadn");
    struct timespec t;
    t.tv_sec = 0;
    t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
    nanosleep(&t, NULL);
    int processing = 1;
    FILE *targetFile, *lp;
    BufferItem OUTresult;
    int t_id = OUTid++;
    int offs, ch;
    int numBytes = len;
    while(processing){
        /*** CRITICAL SECTION ********************************/
        sem_wait(&full); /* acquire the full lock */
        pthread_mutex_lock( &pt_mutex );
        cbRead(&cBuff, &OUTresult);
        offs = OUTresult.offset;
        ch = OUTresult.data;
        if (!(lp = fopen(log, "a"))) {
            printf("could not open log file for writing");
        }
        // Write to log file "consume CTn Ox Bb Ii".
        if(fprintf(lp, "consume CT%d O%d B%d I%dn", t_id, offs, ch, cBuff.lastInd) < 0){
            printf("could not write to log file");
        }
        printf("From buffer: offset %d char %cn", OUTresult.offset, OUTresult.data);
        if (!(targetFile = fopen(arg, "r+"))) {
            printf("could not open output file for writing");
        }
        if (fseek(targetFile, OUTresult.offset, SEEK_SET) == -1) {
            fprintf(stderr, "error setting output file position to %un",
                    (unsigned int) OUTresult.offset);
            exit(-1);
        }
        if (fputc(OUTresult.data, targetFile) == EOF) {
            fprintf(stderr, "error writing byte %d to output filen", OUTresult.data);
            exit(-1);
        }
        // Write to log file "write_byte CTn Ox Bb I-1".
        if(fprintf(lp, "write_byte CT%d O%d B%d I-1n", t_id, offs, ch) < 0){
            printf("could not write to log file");
        }
        fclose(lp);
        fclose(targetFile);
        pthread_mutex_unlock( &pt_mutex );
        sem_post(&empty); /* signal empty */
        /*** END CRITICAL SECTION ********************************/
        t.tv_sec = 0;
        t.tv_nsec = rand()%(TEN_MILLIS_IN_NANOS+1);
        nanosleep(&t, NULL);
    }
    outJoin[t_id] = 1; // This OUT thread is ready to be joined.
    printf("EXIT OUTthreadn");
    pthread_exit(0);
}

我将把这段代码组织成这样的形式:

... processing ...
mutex lock
resource read / write
mutex unlock
... continue processing

设置要共享的每个资源。因此,您最终会有多个互斥锁,一个用于生产者读取文件,一个用于生产者写入循环缓冲区。一个用于从循环缓冲区读取的消费者…每个都将封装一个单一的读/写操作到他们所尊重的资源。

我也会这样做,这样你的循环缓冲区不能覆盖自己,否则你将无法同时读和写缓冲区,只有一个互斥锁用于读/写操作,有效地增加了消费者和生产者线程的等待时间。

这可能不是"最好"的解决方案,但它应该比在你的大部分代码周围有一个巨大的锁处理得更快。

最新更新