我目前正在尝试使用Talend开放工作室为ESB构建RabbitMQ队列的使用者。在阅读了我能找到的关于这个主题的每一篇帖子后,我有两个不同的版本,但没有一个真正有效。
版本1:
我的路线:
cMessagingEndpoint路由
cMessagingEndpoint1 URI:
"rabbitmq://host:port/exchange?username=xxxxx&密码=xxxx&队列=xxxx";
我还尝试向这个URI中添加了耐久=true和no_redeclare=true,但随后我收到了一个未知参数错误。所以我又去掉了那两个。
cMessagingEndpoint URI
cMessagingEndpoint骆驼组件
控制台输出:这似乎是旧语法,驱动程序错误导致连接关闭。
控制台输出
[WARN ] 09:29:07 org.apache.camel.component.rabbitmq.RabbitMQComponent- The old syntax rabbitmq://hostname:port/exchangeName is deprecated. You should configure the hostname on the component or ConnectionFactory
[WARN ] 09:29:08 com.rabbitmq.client.impl.ForgivingExceptionHandler- An unexpected connection driver error occured (Exception message: Connection reset)
[WARN ] 09:29:08 org.apache.camel.component.rabbitmq.RabbitConsumer- Unable to open channel for RabbitMQConsumer. Continuing and will try again
我认为驱动程序错误可能是由于我没有发送";持久:true,no_declayer:true",但我不确定在哪里添加这两个选项。
版本2:
路线2
我的ConnectionFactory
cMQConnectionFactory设置
定义的实际队列。
cAMQ设置
包括耐久性和no_dlare。必须定义这些,否则我永远无法连接到RabbitMQ服务器
cAMQ高级设置
但我再次收到一个未知的参数错误。
org.apache.camel.FailedToCreateRouteException: Failed to create route test_receiveMQMessage_cAMQP_1: Route(test_receiveMQMessage_cAMQP_1)[[From[cMQConnectionFact... because of Failed to resolve endpoint: cMQConnectionFactory1://queue:arcplace?durable=true&no_declare=true due to: Failed to resolve endpoint: cMQConnectionFactory1://queue:arcplace?durable=true&no_declare=true due to: There are 2 parameters that couldn't be set on the endpoint. Check the uri if the parameters are spelt correctly and that they are properties of the endpoint. Unknown parameters=[{durable=true, no_declare=true}]
有人知道我如何解决这个问题并将我的侦听器连接到RabbitMQ队列吗?
感谢
编辑:
为了测试并确保连接是可能的,我创建了一个ruby脚本来向交换发布消息,并使用发送到队列的消息。这样做没有任何问题。
我仍在努力让Talend Route/Job发挥作用。
编辑2:
我检查了RabbitMQ Broker的设置。它是裸机安装,支持TLS上的AMQP 0.9.1。不支持AMQP 1.0或MQTT。
要继续使用AMQP 0.9,必须使用带有rabbitmq-url(带有连接工厂bean的新语法(的cMessagingEndpoint
:
rabbitmq:myexchange?queue=myqueue
您必须注册一个连接工厂bean,camel将使用cBeanRegister
自动检测该bean。我使用全局bean实例来传递上下文变量,如host、user等。cBeanRegister在简单模式中被配置为具有类名CCD_;上下文";(这是您的属性上下文(。您可以根据需要指定bean id。
public class RabbitConnectionFactory extends ConnectionFactory {
public RabbitConnectionFactory(Properties properties) {
setHost(properties.getProperty("amqp_host"));
setPort(Integer.parseInt(properties.getProperty("amqp_port")));
setUsername(properties.getProperty("amqp_user"));
setPassword(properties.getProperty("amqp_password"));
}
}