如何(正确地)使用健壮的pthread进行进程同步



我们在生产系统中有一个错误,一个进程在持有共享内存互斥锁时发生了段错误。我们希望它在死亡时释放锁。我们使用sem_wait()/sem_post(),但是做了功课,我发现这个API不允许这样的行为:

http://www.usenetmessages.com/view.php?c=computer& g = 1074, id = 78029, p = 0

文章说,答案是使用健壮的pthreads API。我找到了以下关于这个主题的文章:

http://www.embedded-linux.co.uk/tutorial/mutex_mutandis

但是,实现了下面的代码,我有一个不可靠的行为,也就是说,如果我告诉进程3,例如,段故障,代码工作得很好。其他进程醒来,认识到一个进程在持有互斥锁时死亡,并恢复。但是,如果我告诉进程0死亡,或者我应该删除第63行上的睡眠调用,那么一旦失败的进程结束自己,其他进程就不会醒来。我做错了什么吗?

#include <stdio.h>
#include <stdlib.h>
#include <features.h>
#define __USE_POSIX
#include <signal.h>
#include <sys/types.h>
#include <unistd.h>
#define __USE_MISC
#include <sys/mman.h>
#include <fcntl.h>
#include <errno.h>
#define __USE_GNU   /* Necessario para usar a API PTHREAD_MUTEX_ROBUST_NP */
#define __USE_UNIX98 /* Necessario para usar a funcao pthread_mutexattr_settype */
#include <pthread.h>
#include <sys/wait.h>
static void *shrd;
static int child_main(int slot, int segfault) {
    pthread_mutex_t   *lock = (pthread_mutex_t *) shrd;
    int                err;
    if ( 0 != (err=pthread_mutex_lock(lock)) ) {
        switch(err) {
        case EINVAL:
            printf("Lock invalido no filho [%d]n", slot);
            goto excecao; 
        case EDEADLK:
            printf("O filho [%d] tentou travar um lock que jah possui.n", slot);
            break;
        case EOWNERDEAD:
            printf("Filho [%d] foi informado que o processo que estava com o lock morreu.n", slot);
            if ( 0 == pthread_mutex_consistent_np(lock) ) {
                printf("Filho [%d] retornou o lock para um estado consistente.n", slot);
            } else {
                fprintf(stderr, "Nao foi possivel retornar o lock a um estado consistente.n");
                goto desistir;
            }
            if ( 0 != (err=pthread_mutex_lock(lock)) ) {
                fprintf(stderr, "Apos recuperar o estado do lock, nao foi possivel trava-lo: %dn", err);
                goto desistir;
            }

        case ENOTRECOVERABLE:
            printf("O filho [%d] foi informado de que o lock estah permanentemente em estado inconsistente.n", slot);
            goto desistir;
        default:
            printf("Erro desconhecido ao tentar travar o lock no filho [%d]: [%d]n", slot, err);
            goto excecao; 
        }
    }
    printf("Filho [%d] adquiriu o lock.n", slot);
    if ( segfault == slot ) {
        printf("Matando o PID [%d] com SIGSEGV.n", getpid());
        kill(getpid(), SIGSEGV); 
    } else {
        sleep(1);
    }
    if ( 0 != (err = pthread_mutex_unlock(lock)) ) {
        switch (err) {
        case EPERM:
            printf("O filho [%d] tentou liberar o lock, mas nao o possui.n", slot);
            break;
        default:
            fprintf(stderr, "Erro inesperado ao liberar o lock do filho [%d]: [%d]n", slot, err);
        }
    } else {
        printf("Filho [%d] retornou o lock.n", slot);
    }
    return 0;
excecao:
    fprintf(stderr, "Programa terminado devido excecao.n");
    return 1;
desistir:
    fprintf(stderr, "A execucao do sistema nao deve prosseguir. Abortando todos os processos.n");
    kill(0, SIGTERM);
    /* unreachable */
    return 1;
}
int main(int argc, const char * const argv[]) {
    pid_t               filhos[10];
    int                 status;
    pid_t               p;
    int                 segfault = -1;
    pthread_mutexattr_t attrs;
    if ( argc > 1 ) {
        segfault = atoi(argv[1]);
        if ( segfault < 0 || segfault > 9 )
            segfault = -1;
    }
    if ( (shrd = mmap(NULL, sizeof(pthread_mutex_t), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0)) == MAP_FAILED ) {
        perror("Erro ao criar shrd mem:n");
        exit(1);
    }
    pthread_mutexattr_init         (&attrs);
    pthread_mutexattr_settype      (&attrs, PTHREAD_MUTEX_RECURSIVE_NP);
    pthread_mutexattr_setrobust_np (&attrs, PTHREAD_MUTEX_ROBUST_NP);
    pthread_mutexattr_setpshared   (&attrs, PTHREAD_PROCESS_SHARED);
    /* 
        Devido a um BUG na glibc 2.5 (que eh a usada pelo CentOS 5,
        a unica forma de fazer os mutexes robustos funcionarem eh
        setando o protocolo para PTHREAD_PRIO_INHERIT:
        http://sourceware.org/ml/libc-help/2010-04/msg00028.html
    */
    pthread_mutexattr_setprotocol  (&attrs, PTHREAD_PRIO_INHERIT);
    pthread_mutex_init             ((pthread_mutex_t*) shrd, &attrs);
    pthread_mutexattr_destroy      (&attrs);
    for (size_t i=0; i<sizeof(filhos)/sizeof(pid_t); ++i) {
        if ( (filhos[i]=fork()) == 0 ) {
            return child_main((int) i, segfault);
        } else {
            if ( filhos[i] < 0 ) {
                fprintf(stderr, "Erro ao criar o filho [%zu]. Abortando.n", i);
                exit(1);
            }
        }
    }
    for (size_t i=0; i<sizeof(filhos)/sizeof(pid_t); ++i) {
        do {
            p = waitpid(filhos[i], &status, 0);
        } while (p != -1);
    }
    printf("Pai encerrou a sua execucao.n");
    return 0;
}

BTW:我正在编译CentOS 5, 64位:

$ uname -rm
2.6.18-194.el5 x86_64
glibc-2.5-49
gcc-4.1.2-48.el5

(对不起,代码中的句子和注释是用我的母语葡萄牙语写的)

我尝试了一些其他方法,即:1. 使用POSIX屏障2. 让父线程在fork()时持有锁,并在每个子线程释放锁后增加一个计数器。

第一种方法根本不起作用,但我发布了我使用的源代码,因为我可能在使用API时犯了一些错误:

在child_main

:

pthread_barrier_t *barr = (pthread_barrier_t *) ((char *) shrd + sizeof(pthread_mutex_t));
...
int rc = pthread_barrier_wait(barr);
if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
{
   printf("Nao foi possivel esperar na barreira.n");
   exit(-1);
}
主要:

pthread_barrierattr_t   barr_attrs;
pthread_barrier_t      *barr;
...
initialize(pthread_barrierattr_init,       &barr_attrs);
initialize(pthread_barrierattr_setpshared, &barr_attrs, PTHREAD_PROCESS_SHARED);
barr = (pthread_barrier_t *) ((char *) shrd + sizeof(pthread_mutex_t));
if ( (init_result = pthread_barrier_init(barr, &barr_attrs, 10)) != 0 ) {
   printf("Nao foi possivel iniciar a barreira.n");
   exit(EXIT_FAILURE);
}

Initialize是一个宏,定义为:

 #define initialize(func, ...) 
 do { 
    init_result = func(__VA_ARGS__); 
    if ( 0 != init_result ) { 
      stored_errno = errno; 
      func_name = #func; 
      goto erro_criacao_semaforo; 
    } 
 } while(0);

第二种方法似乎有效:

在child_main

:

int               *contador = (int *) ((char *) shrd + sizeof(pthread_mutex_t) + sizeof(int));
...
int *n = (int *)(lock+1);
...
if ( 0 != (err=pthread_mutex_lock(lock)) ) {
...
主要:

volatile int           *n; // Cada filho iniciado incrementa esta variavel. 
                           // Qdo ela chega em 10, liberamos o lock.
...
n          = (int *) ((char *) shrd + sizeof(pthread_mutex_t));
...
pthread_mutex_lock(mutex);
for (i=0; i<sizeof(filhos)/sizeof(pid_t); ++i) {
... // the fork goes here.
}
while (*n != 10); // Isto garante que todos os filhos cheguem ao lock.
pthread_mutex_unlock(mutex);

但是一旦我添加了一个随机的睡眠时间,所以它们是不同步的,我又一次得到了死锁:

在child_main

:

int                num_sorteado;
struct timespec    dessincronizador = { 1, 0 };
int *n = (int *)(lock+1);
num_sorteado = 1 + (int) (999999.0 * (rand() / (RAND_MAX + 1.0)));
dessincronizador.tv_nsec = num_sorteado;
nanosleep(&dessincronizador, NULL);
if ( 0 != (err=pthread_mutex_lock(lock)) ) {
...

遗憾的是,似乎没有可靠的方法来了解进程在持有锁时是否死亡,因此解决我们问题的最佳方法是捕获正在死亡的进程上的信号并引发kill(0, SIGTERM)以使其他进程也死亡。

您的EOWNERDEAD块在ENOTRECOVERABLE块之前缺少break。此外,根据pthread_mutex_lock手册,在第一次调用pthread_mutex_lock()之后,即使返回EOWNERDEAD,也由调用者持有锁。因此,不应该在EOWNERDEAD的块中再次调用它。

最新更新