我正在使用带有Symfony 4信使组件的worker。
此工作人员是
- 接收消息(来自 rabbitMQ)
- 启动 FFMPEG
- 对视频进行处理
- 并将某些内容保存在数据库中。
为了在Symfony上配置这个worker,我已经这样做了(中间件很重要):
// config/packages/framework.yaml
framework:
messenger:
buses:
command_bus:
middleware:
# each time a message is handled, the Doctrine connection
# is "pinged" and reconnected if it's closed. Useful
# if your workers run for a long time and the database
# connection is sometimes lost
- doctrine_ping_connection
# After handling, the Doctrine connection is closed,
# which can free up database connections in a worker,
# instead of keeping them open forever
- doctrine_close_connection
transports:
ffmpeg:
dsn: '%env(CLOUDAMQP_URL)%'
options:
auto_setup: false
exchange:
name: amq.topic
type: topic
queues:
ffmpeg: ~
routing:
# Route your messages to the transports, for now all are AMQP messages
'AppApiMessageAMQPvideoFFMPEG': ffmpeg
## Handle multiple buses ? https://symfony.com/doc/current/messenger/multiple_buses.html
## When queries and command should be distinguished
然后,为了了解可能导致此问题的原因,我尝试调试信使以查看中间件是否正确配置
root@b9eec429cb54:/var/www/html# php bin/console debug:messenger
Messenger
=========
command_bus
-----------
The following messages can be dispatched:
------------------------------------------------------
AppApiMessageAMQPvideoFFMPEG
handled by AppApiMessageHandlerFFMPEGHandler
------------------------------------------------------
一切似乎都很好,对吧?
那么这怎么可能看到这个:
[2019-08-23 10:25:26] 信使。错误:重试应用程序\Api\消息\AMQP视频FFMPEG - 重试 #1。{"message":"[object] (App\Api\Message\AMQPvideoFFMPEG: {})","class":"App\Api\Message\AMQPvideoFFMPEG","retryCount":1,"error":"[object] (Doctrine\DBAL\Exception\ConnectionException(code: 0): 驱动程序中发生异常: SQLSTATE[HY000] [2002] 连接超时于/var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/AbstractMySQLDriver.php:93, Doctrine\DBAL\Driver\PDOException(code: 2002): SQLSTATE[HY000] [2002] 连接超时时间/var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/PDOConnection.php:31, PDOException(code: 2002): SQLSTATE[HY000] [2002] 连接超时于/var/www/html/vendor/doctrine/dbal/lib/Doctrine/DBAL/Driver/PDOConnection.php:27)"} []
我完全迷失了,我错过了什么吗?
这种情况有时会发生,但它大部分时间都有效,我想当我的工作线程失去与数据库的连接时会发生此错误,特别是如果 ffmpeg 处理持续 7 分钟或更长时间,但这应该通过 ping 和紧密连接的中间件来避免。所以我不清楚这里有什么问题。
在阅读了我的中间件的代码之后,尤其是这个块
https://github.com/symfony/symfony/blob/4.4/src/Symfony/Bridge/Doctrine/Messenger/DoctrinePingConnectionMiddleware.php
class DoctrinePingConnectionMiddleware extends AbstractDoctrineMiddleware
{
protected function handleForManager(EntityManagerInterface $entityManager, Envelope $envelope, StackInterface $stack): Envelope
{
$connection = $entityManager->getConnection();
if (!$connection->ping()) {
$connection->close();
$connection->connect();
}
if (!$entityManager->isOpen()) {
$this->managerRegistry->resetManager($this->entityManagerName);
}
return $stack->next()->handle($envelope, $stack);
}
}
我们可以看到我的处理程序在连接打开后立即被调用。 我认为这种行为应该有效,但是FFMPEG可以在很长一段时间内使用相同的RabbitMQ消息。因此,我的处理程序的最后一步会将某些内容插入数据库,可以提供mySQL已消失错误或连接超时。
这就是为什么,我把这个片段放到一个没有调用处理程序的东西的方法中,只有与 doctrine connect 相关的代码,然后我在插入我的数据库之前调用它,如下所示:
public function __invoke(AMQPvideoFFMPEG $message)
{
// reset connection if not found
$this->processService->testConnection();
$process = $this->processService->find($message->getProcess());
$this->renderServcie->updateQueue($process->getQueue(), "processing");
// some other stuff
}
其中 testConnection() 方法是
/**
* Reconnect if connection is aborted for some reason
*/
public function testConnection()
{
$connection = $this->entityManager->getConnection();
if (!$connection->ping()) {
$connection->close();
$connection->connect();
}
}
但在那之后我尝试了另一个问题
不支持重置非惰性管理器服务。设置 "doctrine.orm.default_entity_manager"服务懒惰且需要 "symfony/proxy-manager-bridge"在你的 composer.json 文件中。
安装"symfony/proxy-manager-bridge"后,错误消失了。
到目前为止,没有遇到连接超时的情况。走着瞧。
只需在执行任何插入操作之前断开连接:
public function handle(…)
{
// your time-consuming business logic
// disconnect if needed
if (!$this->entityManager->getConnection()->ping()) {
$this->entityManager->getConnection()->close();
}
// save your work
$this->entityManager->flush();
}