从Gearman的多个工作中获取回应,但暂停后流产



在简而言之中:我想在Gearman Client中呼叫runTasks()的总体超时。

我觉得我不能成为第一个想要这个的人,但是我找不到如何将其放在一起的例子。

这是我想要实现的:

  1. 在PHP脚本中,使用Gearman Client在并行中启动一系列作业
  2. 每个工作都会产生一些搜索结果,PHP脚本将需要处理
  3. 一些工作可能需要一些时间才能跑步,但我不想等待最慢的工作。相反,在n毫秒之后,我想处理已经完成的所有工作的结果,并忽略那些没有的工作。

需求1和2使用PHP GearMancLient的addTask((和RunTasks((方法很简单,但是直到提交的作业完成为止,这将限制为止,因此不满足3. <<<<<<<<<<<<<<<</p>

这是我到目前为止尝试过的一些方法:

  • 使用Settimeout((的超时设置测量连接已ive的时间,这不是我感兴趣的。
  • 使用背景作业或任务,似乎没有任何方法来检索工人返回的数据。已经涵盖了几个问题:1 2
  • AddTaskStatus((示例中的自定义轮询循环几乎是我需要的,但是它使用了后台作业,因此再次看不到任何结果。它还包括含糊的评论"更好的方法是使用事件回调",而无需解释其含义的含义,或者他们要替换的示例的哪一部分。
  • 客户端选项包括gearman_client_non_blocking模式,但我不明白如何使用此方法,如果非块runTasks()与使用setTaskBackground()而不是setTask()有所不同。

我已经看到了返回通信只能使用其他机制(例如共享数据存储(的建议,但是在这种情况下,我也可以抛弃齿轮并使用RabbitMQ构建自定义解决方案。

我认为我找到了一个可行的解决方案,尽管我仍然对替代方案感兴趣。

关键是在I/O超时后再次致电runTasks()继续等待先前的同步任务,因此您可以从这些部分中构建一个投票循环:

  • 同步,并行,用addTask()设置的任务。
  • 使用setCompleteCallback()的完成回调集,该回调集跟踪哪些任务已经完成以及仍在待处理中。
  • 使用setTimeout()的低I/O超时设置,它充当您的轮询频率。
  • 在循环中重复对runTasks()的呼叫,当所有任务完成或达到整体超时时,请退出。这也可能具有更复杂的退出条件,例如" n秒或至少x结果"等。

最大的缺点是超时发布了PHP警告,因此您必须用自定义错误处理程序或@操作员挤压。

这是一个经过全面测试的示例:

// How long should we wait each time around the polling loop if nothing happens
define('LOOP_WAIT_MS', 100);
// How long in total should we wait for responses before giving up
define('TOTAL_TIMEOUT_MS', 5000);
$client= new GearmanClient();
$client->addServer();
// This will fire as each job completes.
// In real code, this would save the data for later processing,
// as well as tracking which jobs were completed, tracked here with a simple counter.
$client->setCompleteCallback(function(GearmanTask $task) use (&$pending) {
        $pending--;
        echo "Complete!n";
        echo $task->data();
});
// This array can be used to track the tasks created. This example just counts them.
$tasks = [];
// Sample tasks; the workers sleep for specified number of seconds before returning some data.
$tasks[] = $client->addTask('wait', '2');
$tasks[] = $client->addTask('wait', '2');
$tasks[] = $client->addTask('wait', '2');
$tasks[] = $client->addTask('wait', '2');
$tasks[] = $client->addTask('wait', '2');
$tasks[] = $client->addTask('wait', '2');
$pending = count($tasks);
// This is the key polling loop; runTasks() here acts as "wait for a notification from the server"
$client->setTimeout(LOOP_WAIT_MS);
$start = microtime(true);
do {
        // This will abort with a PHP Warning if no data is received in LOOP_WAIT_MS milliseconds
        // We ignore the warning, and try again, unless we've reached our overall time limit
        @$client->runTasks();
} while (
        // Exit the loop once we run out of time
        microtime(true) - $start < TOTAL_TIMEOUT_MS / 1000
        // Additional loop exit if all tasks have been completed
        // This counter is decremented in the complete callback
        && $pending > 0
);
echo "Finished with $pending tasks unprocessed.n";

您的用例听起来像是为

创建的can_do_timeout。

can_do_timeout

 Same as CAN_DO, but with a timeout value on how long the job
 is allowed to run. After the timeout value, the job server will
 mark the job as failed and notify any listening clients.
 Arguments:
 - NULL byte terminated Function name.
 - Timeout value.

因此,对于任何(工人,功能(元组,您可以定义最长的时间来处理工作,否则将被丢弃。

不幸的是,在C服务器中似乎有一个错误,该错误在1000秒时对超时进行了编码。

一个解决方法是,如果您能够在Gearman之外实现超时逻辑。例如,如果您使用卷曲,肥皂,插座等,通常可以通过调整这些设置来实现所需的效果。

相关内容

  • 没有找到相关文章

最新更新