我试图创建一个PHP脚本,在后台和分叉子进程运行。(我知道这会让服务器崩溃;在这个问题的范围之外还有一些额外的保护措施)
简单地说,代码是这样工作的:
$pids = array();
$ok = true;
while ($ok)
{
$job = $pheanstalk->watch('jobs')->ignore('default')->reserve();
echo 'Reserved job #' . $job->getId();
$pids[$job->getId()] = pcntl_fork();
if(!$pids[$job->getId()]) {
doStuff();
$pheanstalk->delete($job);
exit();
}
}
问题是,一旦我分叉进程,我得到错误:
Reserved job #0
PHP Fatal error: Uncaught exception 'Pheanstalk_Exception_ServerException' with message 'Cannot delete job 0: NOT_FOUND'
我的问题是,pheanstalk是如何返回一个没有ID和负载的作业的?它几乎感觉$pheanstalk一旦我叉子被损坏。如果我去掉分叉,一切都可以正常工作。(虽然它必须等待每个进程)
在删除phanstalk作业之前设置这个if条件:
if ($job) $pheanstalk->delete($job);
这是因为很有可能在代码到达此位置之前,文件的另一个php实例已经删除了该作业。(另一个实例仍然可以使用reserve()检索该作业,直到该作业从队列中删除。
出现此问题的原因是该作业由主进程保留。在调用pcntl_fork()
之后,实际上有一个$worker
变量的副本,因此主进程对作业有一个锁,当第二个作业试图删除时,它说它不存在(或者在这种情况下,它被另一个进程保留)。下面的代码通过创建一个新的worker来处理它,然后在主worker上释放作业,并尝试在第二个worker上拾取它。
# worker for the main process
$worker = new PheanstalkPheanstalk($host, $port, $timeout, $persistent);
$pid = -1;
# seek out new jobs
while ($job = $worker->watch('queue_caller')->reserve()) {
# make sure pcntl is installed & enabled
if (function_exists('pcntl_fork')) {
# create a child process
$pid = pcntl_fork();
}
if ($pid === -1) {
# code will run in single threaded mode
} elseif ($pid !== 0) {
# parent process
# release the job so it can be picked up by the fork
$worker->release($job);
# short wait (1/20000th second) to ensure the fork executes first
# adjust this value to what is appropriate for your environment
usleep(50);
# clear out zombie processes after they're completed
pcntl_waitpid(0, $pidStatus, WNOHANG);
# go back to looking for jobs
continue;
} else {
# child worker is needed, because it has to own the job to delete it
/** @var Pheanstalk $worker */
$worker = new PheanstalkPheanstalk($host, $port, $timeout, $persistent);
# only look for jobs for 1 second, in theory it should always find something
$job = $worker->watch('queue_caller')->reserve(1);
if (false === $job) {
# for some reason there is no job
# terminate the child process with an error code
exit(1);
}
}
/** Start your code **/
do_something();
/** End your code **/
# delete the job from the queue
$worker->delete($job);
# only terminate if it's the child process
if ($pid === 0) {
# terminate the child process with success code
exit(0);
}
}