Symfony5信使,相同消息处理程序的并行队列



Symfony messenger:

https://symfony.com/doc/current/messenger.html

问题:

池#1 = (user1创建一个Job信使,Job被拆分为10个信使Message(
Pool#2 = (user2创建一个JobJob被拆分为10个信使Message(

...
Pool#100 = (user100创建一个JobJob被拆分为10个信使的Message(

池#100 将不会执行,直到所有以前的池都未完成。

目标:

我需要并行队列,所有池都将单独运行,因此每个池都有个人队列。

代码示例:

config/packages/messenger.yaml
framework:
messenger:
transports:
sync: "%env(MESSENGER_TRANSPORT_DSN)%"
routing:
'AppMessageJob': sync
src/Message/Job.php
<?php
namespace AppMessage;
class Job
{
private $content;
public function __construct(string $content)
{
$this->content = $content;
}
public function getContent(): string
{
return $this->content;
}
}
src/MessageHandler/JobHandler.php
<?php
namespace AppMessageHandler;
use AppMessageJob;
use SymfonyComponentMessengerHandlerMessageHandlerInterface;
class JobHandler implements MessageHandlerInterface
{
public function __construct()
{}
public function __invoke(Job $message)
{
$params = json_decode($message->getContent(), true);
dump($params);
}
}
src/Controller/JobController.php
<?php
namespace AppController;
use SymfonyBundleFrameworkBundleControllerAbstractController;
use SymfonyComponentHttpFoundationJsonResponse;
use SymfonyComponentHttpFoundationRequest;
use SymfonyComponentMessengerMessageBusInterface;
use SymfonyComponentRoutingAnnotationRoute;
/**
* @Route("/job")
*/
class JobController extends AbstractController
{
/**
* @Route("/create", name="app_job_create")
* @param Request $request
* @param MessageBusInterface $bus
* @return JsonResponse
*/
public function create(Request $request, MessageBusInterface $bus): JsonResponse
{
// ...
$entityId = $entity->getId();
// ...
for ($i = 0; $i < 10; $i++) {
$params['entityId'] = $entityId;
$params['counter'] = $i;
$bus->dispatch(new Job(json_encode($params)));
}
return new JsonResponse([]);
}
}

更多信息:

我想继续使用它,但找不到最简单的解决方案来传递一些唯一的队列名称或 id,然后告诉工人他必须只处理这个Messages池 .
我找到了自定义传输 https://symfony.com/doc/current/messenger/custom-transport.html,但我不确定它是否有帮助。至少我认为只有自定义传输是不够的.
我读过Actor modelshttps://www.brianstorti.com/the-actor-model/但我想只使用 Messenger+Redis,如果可能的话。

可能这里没有解决方案,这个信使还不能处理并行队列。无论如何,我很高兴有任何帮助.
谢谢!

我最终通过使用动态队列名称解决了这个问题.
不幸的是,我被迫拒绝了信使。

另外,symfony 5(2020 年 6 月(目前不支持 RabbitMQ https://github.com/php-amqplib/RabbitMqBundle 的 SDK,我不确定 100%,但我在 3 版中使用了它,但我无法将其放在 5 版。所以我用了另一个。

这是一个很好的快速入门指南 https://blog.programster.org/rabbitmq-job-queue-with-php
从示例中,我将RABBITMQ_QUEUE_NAME更改为动态名称,并且效果很好。

然后一切照常,养育 RabbitMQ 并配置主管 https://symfony.com/doc/current/messenger.html#supervisor-configuration

如果这对某人有帮助,我会很高兴,谢谢!

最新更新