Talend ESB & RabbitMQ:将侦听器/消费者连接到队列



我目前正在尝试使用Talend开放工作室为ESB构建RabbitMQ队列的使用者。在阅读了我能找到的关于这个主题的每一篇帖子后,我有两个不同的版本,但没有一个真正有效。

版本1:

我的路线:

cMessagingEndpoint路由

cMessagingEndpoint1 URI:

"rabbitmq://host:port/exchange?username=xxxxx&密码=xxxx&队列=xxxx";

我还尝试向这个URI中添加了耐久=true和no_redeclare=true,但随后我收到了一个未知参数错误。所以我又去掉了那两个。

cMessagingEndpoint URI

cMessagingEndpoint骆驼组件

控制台输出:这似乎是旧语法,驱动程序错误导致连接关闭。

控制台输出

[WARN ] 09:29:07 org.apache.camel.component.rabbitmq.RabbitMQComponent- The old syntax rabbitmq://hostname:port/exchangeName is deprecated. You should configure the hostname on the component or ConnectionFactory
[WARN ] 09:29:08 com.rabbitmq.client.impl.ForgivingExceptionHandler- An unexpected connection driver error occured (Exception message: Connection reset)
[WARN ] 09:29:08 org.apache.camel.component.rabbitmq.RabbitConsumer- Unable to open channel for RabbitMQConsumer. Continuing and will try again

我认为驱动程序错误可能是由于我没有发送";持久:true,no_declayer:true",但我不确定在哪里添加这两个选项。

版本2:

路线2

我的ConnectionFactory

cMQConnectionFactory设置

定义的实际队列。

cAMQ设置

包括耐久性和no_dlare。必须定义这些,否则我永远无法连接到RabbitMQ服务器

cAMQ高级设置

但我再次收到一个未知的参数错误。

org.apache.camel.FailedToCreateRouteException: Failed to create route test_receiveMQMessage_cAMQP_1: Route(test_receiveMQMessage_cAMQP_1)[[From[cMQConnectionFact... because of Failed to resolve endpoint: cMQConnectionFactory1://queue:arcplace?durable=true&no_declare=true due to: Failed to resolve endpoint: cMQConnectionFactory1://queue:arcplace?durable=true&no_declare=true due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{durable=true, no_declare=true}]

有人知道我如何解决这个问题并将我的侦听器连接到RabbitMQ队列吗?

感谢

编辑:

为了测试并确保连接是可能的,我创建了一个ruby脚本来向交换发布消息,并使用发送到队列的消息。这样做没有任何问题。

我仍在努力让Talend Route/Job发挥作用。

编辑2:

我检查了RabbitMQ Broker的设置。它是裸机安装,支持TLS上的AMQP 0.9.1。不支持AMQP 1.0或MQTT。

要继续使用AMQP 0.9,必须使用带有rabbitmq-url(带有连接工厂bean的新语法(的cMessagingEndpoint

rabbitmq:myexchange?queue=myqueue

您必须注册一个连接工厂bean,camel将使用cBeanRegister自动检测该bean。我使用全局bean实例来传递上下文变量,如host、user等。cBeanRegister在简单模式中被配置为具有类名CCD_;上下文";(这是您的属性上下文(。您可以根据需要指定bean id。

public class RabbitConnectionFactory extends ConnectionFactory {
public RabbitConnectionFactory(Properties properties) {
setHost(properties.getProperty("amqp_host"));
setPort(Integer.parseInt(properties.getProperty("amqp_port")));
setUsername(properties.getProperty("amqp_user"));
setPassword(properties.getProperty("amqp_password"));
}
}

最新更新