阿帕奇骆驼写信给拉比



我试图使用apachecamel从文件中读取数据并写入rabbitmq队列,但最终出现错误

    Exception in thread "main" org.apache.camel.FailedToCreateRouteException: Failed to create route route1 at: >>> To[rabbitmq://localhost:15672?queue=hello] <<< in route: Route(route1)[[From[file://target/?fileName=doctor.txt&chars... because of Failed to resolve endpoint: rabbitmq://localhost:15672?queue=hello due to: No URI path as the exchangeName for the RabbitMQEndpoint, the URI is rabbitmq://localhost:15672?queue=hello
    at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:945)
    at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:187)
    at org.apache.camel.impl.DefaultCamelContext.startRoute(DefaultCamelContext.java:794)
    at org.apache.camel.impl.DefaultCamelContext.startRouteDefinitions(DefaultCamelContext.java:2184)
    at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:1916)
    at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:1777)
    at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:61)
    at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:1745)
    at test.RMQCamelSender.main(RMQCamelSender.java:38)
Caused by: org.apache.camel.ResolveEndpointFailedException: Failed to resolve endpoint: rabbitmq://localhost:15672?queue=hello due to: No URI path as the exchangeName for the RabbitMQEndpoint, the URI is rabbitmq://localhost:15672?queue=hello
    at org.apache.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:545)
    at org.apache.camel.util.CamelContextHelper.getMandatoryEndpoint(CamelContextHelper.java:71)
    at org.apache.camel.model.RouteDefinition.resolveEndpoint(RouteDefinition.java:202)
    at org.apache.camel.impl.DefaultRouteContext.resolveEndpoint(DefaultRouteContext.java:106)
    at org.apache.camel.impl.DefaultRouteContext.resolveEndpoint(DefaultRouteContext.java:112)
    at org.apache.camel.model.SendDefinition.resolveEndpoint(SendDefinition.java:61)
    at org.apache.camel.model.SendDefinition.createProcessor(SendDefinition.java:55)
    at org.apache.camel.model.ProcessorDefinition.makeProcessor(ProcessorDefinition.java:500)
    at org.apache.camel.model.ProcessorDefinition.addRoutes(ProcessorDefinition.java:213)
    at org.apache.camel.model.RouteDefinition.addRoutes(RouteDefinition.java:942)
    ... 8 more
Caused by: java.lang.IllegalArgumentException: No URI path as the exchangeName for the RabbitMQEndpoint, the URI is rabbitmq://localhost:15672?queue=hello
    at org.apache.camel.component.rabbitmq.RabbitMQComponent.createEndpoint(RabbitMQComponent.java:50)
    at org.apache.camel.component.rabbitmq.RabbitMQComponent.createEndpoint(RabbitMQComponent.java:31)
    at org.apache.camel.impl.DefaultComponent.createEndpoint(DefaultComponent.java:122)
    at org.apache.camel.impl.DefaultCamelContext.getEndpoint(DefaultCamelContext.java:525)
    ... 17 more

以下是我对创建camel上下文和rabbitmqqueue的类的实现。

RMQCamleSender.java

package test;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import test.Producer;
public class RMQCamelSender {
    public static void main(String[] argv) throws Exception {
        Producer queueProd = new Producer();
        queueProd.setupConnection();
        System.out.println(queueProd.toString());
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            public void configure() throws Exception {
                System.out.println("hello world");
                from("file://target/?fileName=doctor.txt&charset=utf-8")
                        .process(new Processor() {
                            public void process(Exchange msg) throws Exception {
                                System.out.println(msg.getIn().getBody(
                                        String.class));
                            }
                        }).to("rabbitmq://localhost:15672?queue=hello");
            }
        });
        context.start();
        Thread.sleep(4000);
        context.stop();
    }
}

Producer.java

package test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Producer {
    public static final String QUEUE_NAME = "hello";
    public static Connection connection;
    public void setupConnection() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    }
}

如果我没有使用Camel并尝试使用标准rabbitmq-库联系队列,那么该程序运行良好。

错误消息非常具有指示性:您在端点URI中缺少交换名称,它不能为空,因为Camel的官方文档说它必须遵循给定的格式:

rabbitmq://hostname[:port]/exchangeName?[选项]

我建议您尝试使用amq.direct交换(有关更多详细信息,请参阅官方RabbitMQ文档),如下所示:

public void configure() throws Exception {
    System.out.println("hello world");
    from("file://target/?fileName=doctor.txt&charset=utf-8")
        .process(new Processor() {
            public void process(Exchange msg) throws Exception {
                System.out.println(msg.getIn().getBody(String.class));
            }
        }).to("rabbitmq://localhost:5672/amq.direct?routingKey=hello");
}

此外,您使用的是15672端口,默认情况下它是RabbitMQ的web管理控制台的端口,我想您没有更改默认设置,所以端口需要是5672。您还必须使用routingKey参数而不是queue参数,因为在RabbitMQ语义中,您发布到exchange:routingKey,并且只从队列中消费。

您需要包含rabbitmqexchange名称。它从这里消失了;

.to("rabbitmq://localhost:15672?queue=hello");

应该是这样的:

.to("rabbitmq://localhost:15672/exchangeName?routingKey=hello");

此外,如果您要发送到exchange,为什么要指定队列?您只需要指定路由密钥,如果有该路由密钥的绑定,则交换机将发送到该队列。

最新更新