symfony2 FOSLasticaBundle:当弹性搜索服务关闭时,如何推迟文档更新



我正在使用symfony2和FOSLasticaBundle。

我的弹性搜索服务经常因为未知的原因而被终止或失败。我已经将systemctl与restart always一起作为临时修复程序。

尽管如此,如果关闭,当条令更新实体时执行索引更新的弹性搜索监听器会给我一个错误52:

无法连接到主机,Elasticsearch关闭?

因此,如果也使用FOSUserBundle更新最后一个用户连接日期,则在日志记录时会发生这种情况。如此依赖弹性搜索真是太烦人了。我已经为这个错误设置了一个异常侦听器,但我更希望捆绑包在稍后服务再次可用时保持更新。

查看捆绑文件,我发现:

vendor/friendsofsymfony/eelastica-bundle/Persister/ObjectPersister.php

public function replaceMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$document = $this->transformToElasticaDocument($object);
$document->setDocAsUpsert(true);
$documents[] = $document;
}
try {
$this->type->updateDocuments($documents);
} catch (BulkException $e) {
$this->log($e);
}
}

这是一个服务,我跳进去的可以被如下覆盖,但这是另一个继承的类,子类被实例化,而不是作为服务调用,所以我不知道如何覆盖它。我怎么能呢?

try {
$this->type->updateDocuments($documents);
} catch (Exception $e) {
if ($e instanceof BulkException)
{
$this->log($e);
}
elseif ($e->getMessage() != "Couldn't connect to host, Elasticsearch down?")
{
throw $e;
}
}

那么,我如何确保下次提供服务时更新文档?

编辑:

我得到错误时的跟踪:

Stack Trace
in vendor/ruflin/elastica/lib/Elastica/Transport/Http.php at line 153   -
}
if ($errorNumber > 0) {
throw new HttpException($errorNumber, $request, $response);
}
return $response;
at Http ->exec (object(Request), array('connection' => array('config' => array('headers' => array()), 'host' => 'localhost', 'port' => '9200', 'logger' => 'fos_elastica.logger', 'enabled' => true))) 
in vendor/ruflin/elastica/lib/Elastica/Request.php at line 167   + 
at Request ->send () 
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 587   + 
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array()) 
in vendor/friendsofsymfony/elastica-bundle/Elastica/Client.php at line 47   + 
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array()) 
in vendor/ruflin/elastica/lib/Elastica/Bulk.php at line 342   + 
at Bulk ->send () 
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 270   + 
at Client ->updateDocuments (array(object(Document))) 
in vendor/ruflin/elastica/lib/Elastica/Index.php at line 131   + 
at Index ->updateDocuments (array(object(Document))) 
in vendor/ruflin/elastica/lib/Elastica/Type.php at line 174   + 
at Type ->updateDocuments (array(object(Document))) 
in vendor/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php at line 144   + 
at ObjectPersister ->replaceMany (array(object(User))) 
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 151   + 
at Listener ->persistScheduled () 
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 182   + 
at Listener ->postFlush (object(PostFlushEventArgs)) 
in vendor/symfony/symfony/src/Symfony/Bridge/Doctrine/ContainerAwareEventManager.php at line 63   + 
at ContainerAwareEventManager ->dispatchEvent ('postFlush', object(PostFlushEventArgs)) 
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 3318   + 
at UnitOfWork ->dispatchPostFlushEvent () 
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 428   + 
at UnitOfWork ->commit (null) 
in vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php at line 357   + 
at EntityManager ->flush (null) 
in src/AppBundle/Model/Classes/CustomBaseController.php at line 61   + 
at CustomBaseController ->flush () 
in src/AppBundle/Controller/Core/VoteController.php at line 68   + 
at VoteController ->voteAction (object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes') 
at call_user_func_array (array(object(VoteController), 'voteAction'), array(object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes')) 
in app/bootstrap.php.cache at line 3029   + 
at HttpKernel ->handleRaw (object(Request), '1') 
in app/bootstrap.php.cache at line 2991   + 
at HttpKernel ->handle (object(Request), '1', true) 
in app/bootstrap.php.cache at line 3140   + 
at ContainerAwareHttpKernel ->handle (object(Request), '1', true) 
in app/bootstrap.php.cache at line 2384   + 
at Kernel ->handle (object(Request)) 
in web/app_dev.php at line 36   + 

消息队列非常适合您的需求。每当您的模型更新时,您都会向MQ发送一条消息。这就是网络进程。然后您有一个工作线程池,这些工作线程使用MQ中的消息并尝试更新ES索引。如果ES现在关闭,则会出现异常,工作程序将死亡,消息将返回到队列。因此,一旦ES在线工作人员完成他们的工作,消息仍在MQ中。

相同的模式不仅可以用于ES,还可以用于任何其他第三方服务。例如,您想发送一封非常重要的电子邮件,但邮件服务器已关闭,您不能等待,现在您必须向客户发送回复。所以把它放到MQ中,让一个经纪人和工人来完成他们的工作。

下面是一段代码,说明如何使用入队MQ库来完成此操作。安装和配置非常容易,所以我将跳过它

标准侦听器必须替换为发送消息的侦听器:

<?php
use EnqueueClientProducerInterface;
class ElasticaUpdateIndexListener
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function postPersist(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'insert'
]);
}
public function postUpdate(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'update'
]);
}
public function preRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'delete'
]);
}
}

此消息的处理器如下所示:

<?php
class ElasticaUpdateIndexProcessor implements PsrProcessor, CommandSubscriberInterface
{
private $doctrine;
protected $objectPersister;
protected $propertyAccessor;
private $indexable;
public function __construct(Registry $doctrine, ObjectPersisterInterface $objectPersister, IndexableInterface $indexable)
{
$this->indexable = $indexable;
$this->objectPersister = $objectPersister;
$this->propertyAccessor = PropertyAccess::createPropertyAccessor();
$this->doctrine = $doctrine;
}
public function process(PsrMessage $message, PsrContext $context)
{
$data = JSON::encode($message->getBody());
if ($data['type'] == 'delete') {
$this->objectPersister->deleteManyByIdentifiers([$data['entityId']]);
return self::ACK;
} 
if (false == $entity = $this->doctrine->getManagerForClass($data['entityClass'])->find($data['entityId'])) {
return self::REJECT;
}
if (false == ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity))) {
return self::ACK;
}
if ($data['type'] == 'insert') {
$this->objectPersister->insertMany([$this->scheduledForInsertion]);
return self::ACK;
}
if ($data['type'] == 'update') {
$this->objectPersister->replaceMany([$this->scheduledForInsertion]);
return self::ACK;
}
return self::REJECT;
}
private function isObjectIndexable($object)
{
return $this->indexable->isObjectIndexable(
$this->config['indexName'],
$this->config['typeName'],
$object
);
}
public static function getSubscribedCommand()
{
return 'elastica_index_entity';
}
}

并运行一些工人:

./bin/console enqueue:consume --setup-broker -vvv 

相关内容

  • 没有找到相关文章

最新更新