我正在使用symfony2和FOSLasticaBundle。
我的弹性搜索服务经常因为未知的原因而被终止或失败。我已经将systemctl与restart always
一起作为临时修复程序。
尽管如此,如果关闭,当条令更新实体时执行索引更新的弹性搜索监听器会给我一个错误52:
无法连接到主机,Elasticsearch关闭?
因此,如果也使用FOSUserBundle更新最后一个用户连接日期,则在日志记录时会发生这种情况。如此依赖弹性搜索真是太烦人了。我已经为这个错误设置了一个异常侦听器,但我更希望捆绑包在稍后服务再次可用时保持更新。
查看捆绑文件,我发现:
vendor/friendsofsymfony/eelastica-bundle/Persister/ObjectPersister.php
public function replaceMany(array $objects)
{
$documents = array();
foreach ($objects as $object) {
$document = $this->transformToElasticaDocument($object);
$document->setDocAsUpsert(true);
$documents[] = $document;
}
try {
$this->type->updateDocuments($documents);
} catch (BulkException $e) {
$this->log($e);
}
}
这是一个服务,我跳进去的可以被如下覆盖,但这是另一个继承的类,子类被实例化,而不是作为服务调用,所以我不知道如何覆盖它。我怎么能呢?
try {
$this->type->updateDocuments($documents);
} catch (Exception $e) {
if ($e instanceof BulkException)
{
$this->log($e);
}
elseif ($e->getMessage() != "Couldn't connect to host, Elasticsearch down?")
{
throw $e;
}
}
那么,我如何确保下次提供服务时更新文档?
编辑:
我得到错误时的跟踪:
Stack Trace
in vendor/ruflin/elastica/lib/Elastica/Transport/Http.php at line 153 -
}
if ($errorNumber > 0) {
throw new HttpException($errorNumber, $request, $response);
}
return $response;
at Http ->exec (object(Request), array('connection' => array('config' => array('headers' => array()), 'host' => 'localhost', 'port' => '9200', 'logger' => 'fos_elastica.logger', 'enabled' => true)))
in vendor/ruflin/elastica/lib/Elastica/Request.php at line 167 +
at Request ->send ()
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 587 +
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array())
in vendor/friendsofsymfony/elastica-bundle/Elastica/Client.php at line 47 +
at Client ->request ('_bulk', 'PUT', '{"update":{"_index":"foodmeup","_type":"user","_id":4}} {"doc":{"firstName":"Dominique","lastName":"Descamps","content":null,"username":"ddescamps","email":"ddescamps@ebp-paris.com","jobSeeker":{"skills":[],"experiences":[],"trainings":[]}},"doc_as_upsert":true} ', array())
in vendor/ruflin/elastica/lib/Elastica/Bulk.php at line 342 +
at Bulk ->send ()
in vendor/ruflin/elastica/lib/Elastica/Client.php at line 270 +
at Client ->updateDocuments (array(object(Document)))
in vendor/ruflin/elastica/lib/Elastica/Index.php at line 131 +
at Index ->updateDocuments (array(object(Document)))
in vendor/ruflin/elastica/lib/Elastica/Type.php at line 174 +
at Type ->updateDocuments (array(object(Document)))
in vendor/friendsofsymfony/elastica-bundle/Persister/ObjectPersister.php at line 144 +
at ObjectPersister ->replaceMany (array(object(User)))
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 151 +
at Listener ->persistScheduled ()
in vendor/friendsofsymfony/elastica-bundle/Doctrine/Listener.php at line 182 +
at Listener ->postFlush (object(PostFlushEventArgs))
in vendor/symfony/symfony/src/Symfony/Bridge/Doctrine/ContainerAwareEventManager.php at line 63 +
at ContainerAwareEventManager ->dispatchEvent ('postFlush', object(PostFlushEventArgs))
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 3318 +
at UnitOfWork ->dispatchPostFlushEvent ()
in vendor/doctrine/orm/lib/Doctrine/ORM/UnitOfWork.php at line 428 +
at UnitOfWork ->commit (null)
in vendor/doctrine/orm/lib/Doctrine/ORM/EntityManager.php at line 357 +
at EntityManager ->flush (null)
in src/AppBundle/Model/Classes/CustomBaseController.php at line 61 +
at CustomBaseController ->flush ()
in src/AppBundle/Controller/Core/VoteController.php at line 68 +
at VoteController ->voteAction (object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes')
at call_user_func_array (array(object(VoteController), 'voteAction'), array(object(Request), 'up', 'Post', 'permettre-le-partage-de-documents-avec-les-equipes'))
in app/bootstrap.php.cache at line 3029 +
at HttpKernel ->handleRaw (object(Request), '1')
in app/bootstrap.php.cache at line 2991 +
at HttpKernel ->handle (object(Request), '1', true)
in app/bootstrap.php.cache at line 3140 +
at ContainerAwareHttpKernel ->handle (object(Request), '1', true)
in app/bootstrap.php.cache at line 2384 +
at Kernel ->handle (object(Request))
in web/app_dev.php at line 36 +
消息队列非常适合您的需求。每当您的模型更新时,您都会向MQ发送一条消息。这就是网络进程。然后您有一个工作线程池,这些工作线程使用MQ中的消息并尝试更新ES索引。如果ES现在关闭,则会出现异常,工作程序将死亡,消息将返回到队列。因此,一旦ES在线工作人员完成他们的工作,消息仍在MQ中。
相同的模式不仅可以用于ES,还可以用于任何其他第三方服务。例如,您想发送一封非常重要的电子邮件,但邮件服务器已关闭,您不能等待,现在您必须向客户发送回复。所以把它放到MQ中,让一个经纪人和工人来完成他们的工作。
下面是一段代码,说明如何使用入队MQ库来完成此操作。安装和配置非常容易,所以我将跳过它
标准侦听器必须替换为发送消息的侦听器:
<?php
use EnqueueClientProducerInterface;
class ElasticaUpdateIndexListener
{
private $producer;
public function __construct(ProducerInterface $producer)
{
$this->producer = $producer;
}
public function postPersist(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'insert'
]);
}
public function postUpdate(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'update'
]);
}
public function preRemove(LifecycleEventArgs $eventArgs)
{
$entity = $eventArgs->getObject();
$this->producer->sendCommand('elastica_index_entity', [
'entity' => $entity->getId(),
'type' => 'delete'
]);
}
}
此消息的处理器如下所示:
<?php
class ElasticaUpdateIndexProcessor implements PsrProcessor, CommandSubscriberInterface
{
private $doctrine;
protected $objectPersister;
protected $propertyAccessor;
private $indexable;
public function __construct(Registry $doctrine, ObjectPersisterInterface $objectPersister, IndexableInterface $indexable)
{
$this->indexable = $indexable;
$this->objectPersister = $objectPersister;
$this->propertyAccessor = PropertyAccess::createPropertyAccessor();
$this->doctrine = $doctrine;
}
public function process(PsrMessage $message, PsrContext $context)
{
$data = JSON::encode($message->getBody());
if ($data['type'] == 'delete') {
$this->objectPersister->deleteManyByIdentifiers([$data['entityId']]);
return self::ACK;
}
if (false == $entity = $this->doctrine->getManagerForClass($data['entityClass'])->find($data['entityId'])) {
return self::REJECT;
}
if (false == ($this->objectPersister->handlesObject($entity) && $this->isObjectIndexable($entity))) {
return self::ACK;
}
if ($data['type'] == 'insert') {
$this->objectPersister->insertMany([$this->scheduledForInsertion]);
return self::ACK;
}
if ($data['type'] == 'update') {
$this->objectPersister->replaceMany([$this->scheduledForInsertion]);
return self::ACK;
}
return self::REJECT;
}
private function isObjectIndexable($object)
{
return $this->indexable->isObjectIndexable(
$this->config['indexName'],
$this->config['typeName'],
$object
);
}
public static function getSubscribedCommand()
{
return 'elastica_index_entity';
}
}
并运行一些工人:
./bin/console enqueue:consume --setup-broker -vvv