我有以下实现使用Spring Boot应用程序来消费来自Azure服务总线的消息,但是我希望能够控制ServiceBusConsumer从使用Spring Boot配置文件属性自动开始侦听主题
在application.yaml
servicebus.consumer.enable=false
它应该禁用ServiceBusConsumer从侦听主题(s),以及我应该能够启动ServiceBusConsumer使用一个REST API -例如:./API/servicebus/consumer/start?
import com.microsoft.azure.servicebus.ExceptionPhase;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.IMessageHandler;
import com.microsoft.azure.servicebus.ISubscriptionClient;
import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Log4j2
@Component
class ServiceBusConsumer implements Ordered {
private final ISubscriptionClient iSubscriptionClient;
ServiceBusConsumer(ISubscriptionClient isc) {
this.iSubscriptionClient = isc;
}
@EventListener(ApplicationReadyEvent.class)
public void consume() throws Exception {
this.iSubscriptionClient.registerMessageHandler(new IMessageHandler() {
@Override
public CompletableFuture<Void> onMessageAsync(IMessage message) {
log.info("received message " + new String(message.getBody()) + " with body ID " + message.getMessageId());
return CompletableFuture.completedFuture(null);
}
@Override
public void notifyException(Throwable exception, ExceptionPhase phase) {
log.error("eeks!", exception);
}
});
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
您可以通过添加@ConditionalOnProperty
注释来有条件地创建ServiceBusConsumer
bean,以确保仅在servicebus.consumer.enabled=true
:
@Log4j2
@Component
@ConditionalOnProperty(prefix = "servicebus.consumer", name = "enabled")
class ServiceBusConsumer implements Ordered {
...
}
官方文档指定您可以使用spring.cloud.azure.servicebus.enabled
属性来完全禁用消费者。
参考:https://microsoft.github.io/spring-cloud-azure/4.2.0/reference/html/index.html servicebus-connection-configuration