Symfony Messenger 与 Apache Kafka 作为队列传输



我不知道如何为Symfony messenger配置Kafka。一切都适用于rabbitmq(我创建了信使和信使处理程序(:

.环境:

MESSENGER_TRANSPORT_DSN=amqp://user:password@myhost:5672/%2f/messages

config/packages/messenger.yaml

framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"

.env

MESSENGER_TRANSPORT_DSN=enqueue://node-1.kafka.myhost.com:9092/%2f/messages

config/packages/messenger.yaml

framework:
messenger:
transports:
async: "%env(MESSENGER_TRANSPORT_DSN)%"

请给我最好的例子。谢谢!

我的开发:Docker + Centos 7 + PHP73,NGINX。

此配置的解决方案:

1. 安装 php-rdkafka(重要提示:版本 3.1.x!,将路径更改为 php ;)(

yum -y install make librdkafka-devel && git clone --branch 3.1.x https://github.com/arnaud-lb/php-rdkafka.git && cd php-rdkafka && /path/to/php73/root/bin/phpize && ./configure --with-php-config=/path/to/php73/root/bin/php-config && make all -j 5 && make install

2. 将 php 扩展添加到 php 中.ini

[rdkafka]
extension=rdkafka.so

3. 为symfony安装软件包:

composer req symfony/messenger enqueue/rdkafka enqueue/enqueue-bundle sroze/messenger-enqueue-transport enqueue/async-event-dispatcher

4. 注册捆绑包 - 添加到配置/捆绑包.php

EnqueueBundleEnqueueBundle::class => ['all' => true],
EnqueueMessengerAdapterBundleEnqueueAdapterBundle::class => ['all' => true],

5. 添加文件 config/packages/enqueue.yaml:

enqueue:
default:
transport:
dsn: "rdkafka://"
global:
group.id: 'myapp'
metadata.broker.list: "%env(KAFKA_BROKER_LIST)%"
topic:
auto.offset.reset: beginning
commit_async: true
client: ~

6. 添加文件配置/包/信使.yaml:

framework:
messenger:
failure_transport: failed
transports:
async:
dsn:  "%env(MESSENGER_TRANSPORT_DSN)%"
failed:
dsn: "doctrine://default?queue_name=failed"
routing:
'AppMessageEmailNotification': async

7. 添加到 .env:

###> messenger ###
MESSENGER_TRANSPORT_DSN=enqueue://default?topic[name]=messages&queue[name]=messages
KAFKA_BROKER_LIST=node-1.kafka.host:9092,node-2.kafka.host:9092,node-3.kafka.host:9092
###< messenger ###

8. 来自文档的消息和消息处理程序:https://symfony.com/doc/current/messenger.html

9. 运行消费者:

php bin/console messenger:consume async

祝你好运!

最新更新