使用AsyncRabbitTemplate添加非阻塞使用者



我是Rabbitmq的新手,我想构建一个非阻塞消费者。我遵循了在使用带有请求/回复模式的AsyncRabbitTemplate时如何构建非阻塞使用者并创建了一个应用程序并部署在本地。但是我得到了AmqpReplyTimeoutException。

我在这里附上一些日志:


2020-03-30 23:13:33,781 DEBUG [main] RabbitTemplate []>: Publishing message [(Body:'test' MessageProperties [headers={}, correlationId=a6a94f00-c5a3-4f41-a5b9-69293bc6d153, replyTo=amq.rabbitmq.reply-to, contentType=text/plain, contentEncoding=UTF-8, contentLength=4, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [], routingKey = [tax.webflux.reactor.determine]
AmqpReplyTimeoutException [Reply timed out, requestMessage=(Body:'test' MessageProperties [headers={}, correlationId=a6a94f00-c5a3-4f41-a5b9-69293bc6d153, replyTo=amq.rabbitmq.reply-to, contentType=text/plain, contentEncoding=UTF-8, contentLength=4, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])]
at org.springframework.amqp.rabbit.AsyncRabbitTemplate$RabbitFuture$TimeoutTask.run(AsyncRabbitTemplate.java:757)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

这是我的课:


import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.PostConstruct;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate;
import org.springframework.amqp.rabbit.AsyncRabbitTemplate.RabbitConverterFuture;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
import com.rabbitmq.client.Channel;
@ComponentScan("com.sap.slh.tax.*")
@SpringBootApplication
@EnableAsync
public class SpringwebfluxdemoApplication {
private final ExecutorService exec = Executors.newCachedThreadPool();
@Autowired
private RabbitTemplate template;
@Autowired
private AmqpAdmin amqpAdmin;
@Bean
public ApplicationRunner runner(AsyncRabbitTemplate asyncTemplate) {
return args -> {
RabbitConverterFuture<Object> future = asyncTemplate.convertSendAndReceive("tax.webflux.reactor.determine", "test");
future.addCallback(r -> {
System.out.println("Reply: " + r);
}, t -> {
t.printStackTrace();
});
};
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate template) {
return new AsyncRabbitTemplate(template);
}
@PostConstruct
public void initializeQueue() {
TopicExchange taxServiceTopicExchange = (TopicExchange) ExchangeBuilder.topicExchange("tax.webflux.reactor.TAXSERVICE")
.durable(true).build();
Queue taxAttributesDeterminationQueue = QueueBuilder.durable("tax.webflux.reactor.queue").build();
Binding binding = BindingBuilder.bind(taxAttributesDeterminationQueue).to(taxServiceTopicExchange)
.with("tax.webflux.reactor.determine");
amqpAdmin.declareExchange(taxServiceTopicExchange);
amqpAdmin.declareQueue(taxAttributesDeterminationQueue);
amqpAdmin.declareBinding(binding);
}
@RabbitListener(queues = "tax.webflux.reactor.queue")
public void listen(String in, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
@Header(AmqpHeaders.CORRELATION_ID) String correlationId,
@Header(AmqpHeaders.REPLY_TO) String replyTo) {
ListenableFuture<String> future = handleInput(in);
future.addCallback(result -> {
Address address = new Address(replyTo);
this.template.convertAndSend(address.getExchangeName(), address.getRoutingKey(), result, m -> {
m.getMessageProperties().setCorrelationId(correlationId);
return m;
});
try {
channel.basicAck(tag, false);
}
catch (IOException e) {
e.printStackTrace();
}
}, t -> {
t.printStackTrace();
});
}
private ListenableFuture<String> handleInput(String in) {
SettableListenableFuture<String> future = new SettableListenableFuture<String>();
exec.execute(() -> {
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
future.set(in.toUpperCase());
});
return future;
}

public static void main(String[] args) {
SpringApplication.run(SpringwebfluxdemoApplication.class, args);
}
}

我的application.yml文件:

---
spring:
rabbitmq:
host: "${vcap.services.rabbitmq.credentials.hostname:localhost}"
password: "${vcap.services.rabbitmq.credentials.password:guest}"
port: "${vcap.services.rabbitmq.credentials.port:5672}"
username: "${vcap.services.rabbitmq.credentials.username:guest}"
virtual_host: "${vcap.services.rabbitmq.credentials.virtual_host:/}"

如有任何帮助,我们将不胜感激。提前谢谢。

答案是18个月大;该框架现在支持返回CCD_ 1或CCD_。

请参阅异步@RabbitListener返回类型。

然后,您就不需要侦听器中的所有代码了,只需完成返回的值即可。

相关内容

最新更新