我尝试实现驻留在共享内存中的HaskellControl.Concurrent.MVar
,并允许使用POSIX功能在多个独立进程/程序之间进行通信。 但是我失败了很多僵局。
问题是pthread_cond_timedwait
有时不会返回在 GHC FFI 中调用(尽管是interruptible
或unsafe
(。 经过几天绝望的尝试来解决这个问题,我决定缩小代码并请求社区提供帮助。不幸的是,我无法将问题压缩为可粘贴在此处的几行代码。因此,我将(尽可能小的(代码存储在 github 上,以及有关如何复制问题的说明,这里是指向其当前状态(mvar-fail
分支(的永久链接。
从本质上讲,要获取和放置 mvar 的函数如下所示:
int mvar_take(MVar *mvar, ...) {
pthread_mutex_timedlock(&(mvar->statePtr->mvMut), &timeToWait);
while ( !(mvar->statePtr->isFull) ) {
pthread_cond_signal(&(mvar->statePtr->canPutC));
pthread_cond_timedwait(&(mvar->statePtr->canTakeC), &(mvar->statePtr->mvMut), &timeToWait);
}
memcpy(localDataPtr, mvar->dataPtr, mvar->statePtr->dataSize);
mvar->statePtr->isFull = 0;
pthread_mutex_unlock(&(mvar->statePtr->mvMut));
}
int mvar_put(MVar *mvar, ...) {
pthread_mutex_timedlock(&(mvar->statePtr->mvMut), &timeToWait);
while ( mvar->statePtr->isFull ) {
pthread_cond_signal(&(mvar->statePtr->canTakeC));
pthread_cond_timedwait(&(mvar->statePtr->canPutC), &(mvar->statePtr->mvMut), &timeToWait);
}
memcpy(mvar->dataPtr, localDataPtr, mvar->statePtr->dataSize);
mvar->statePtr->isFull = 1;
pthread_mutex_unlock(&(mvar->statePtr->mvMut));
}
(加上每个命令后的错误检查和 printfs(。mvar_take
的完整代码。 初始化过程如下:
pthread_mutexattr_init(&(s.mvMAttr));
pthread_mutexattr_settype(&(s.mvMAttr), PTHREAD_MUTEX_ERRORCHECK);
pthread_mutexattr_setpshared(&(s.mvMAttr), PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&(s.mvMut), &(s.mvMAttr));
pthread_condattr_init(&(s.condAttr));
pthread_condattr_setpshared(&(s.condAttr), PTHREAD_PROCESS_SHARED);
pthread_cond_init(&(s.canPutC), &(s.condAttr));
pthread_cond_init(&(s.canTakeC), &(s.condAttr));
完整代码。 Haskell部分看起来像这样:
foreign import ccall interruptible "mvar_take"
mvar_take :: Ptr StoredMVarT -> Ptr a -> CInt -> IO CInt
foreign import ccall interruptible "mvar_put"
mvar_put :: Ptr StoredMVarT -> Ptr a -> CInt -> IO CInt
takeMVar :: Storable a => StoredMVar a -> IO a
takeMVar (StoredMVar _ fp) = withForeignPtr fp $ p -> alloca $ lp -> do
r <- mvar_take p lp
if r == 0
then peek lp
else throwErrno $ "takeMVar failed with code " ++ show r
putMVar :: Storable a => StoredMVar a -> a -> IO ()
putMVar (StoredMVar _ fp) x = withForeignPtr fp $ p -> alloca $ lp -> do
poke lp x
r <- mvar_put p lp
unless (r == 0)
$ throwErrno $ "putMVar failed with code " ++ show r
完整代码。 将 FFI 从interruptible
更改为unsafe
并不能防止死锁。 有时死锁每隔两次运行发生一次,有时仅在 50 次运行后发生(其余部分按预期执行(。
我的猜测是,GHC 可能会通过某些操作系统信号处理干扰 POSIX 互斥体的工作,但我不知道 GHC 的内部结构不足以验证它。
是我做了什么愚蠢的错误,还是我需要添加一些特殊的技巧才能让它在 GHC FFI 中工作?
附言:包含我的调查的自述文件的最新版本可在interprocess mvar-fail
.
更新 13.06.2018: 我试图通过以下内容将函数代码包围来暂时阻止所有操作系统信号:
sigset_t mask, omask;
sigfillset(&mask);
sigprocmask(SIG_SETMASK, &mask, &omask);
...
sigprocmask(SIG_SETMASK, &omask, NULL);
这无济于事。
好吧,正如预期的那样,这是我的错 - 一个非常 C 初学者的错误。 从初始化片段中可以看出,我将互斥体和条件变量保留在一个结构中。 从这里的代码片段中看不到,但可以通过我提供的链接(在 github 上(看到的是,我正在将该结构复制到共享内存中。互斥体不仅不允许这样做,而且在初始化结构中的所有内容之前,我还愚蠢地复制了它。
也就是说,我只是复制了一个 C 结构,我应该在其中设置一个指针。
这里最令人惊讶的是,代码有时仍然有效。 这是错误代码的链接。