如何在使用Spring Integration向主题发送消息时获取Azure服务总线消息id



使用Spring Integration向Azure服务总线上的主题发送消息后,我想获得Azure生成的消息id。我可以使用JMS做到这一点。是否有一种方法可以使用Spring Integration来做到这一点?我正在使用的代码:

@Service
public class ServiceBusDemo {
private static final String OUTPUT_CHANNEL = "topic.output";
private static final String TOPIC_NAME = "my_topic";
@Autowired
TopicOutboundGateway messagingGateway;
public String send(String message) {
// How can I get the Azure message id after sending here?
this.messagingGateway.send(message);
return message;
}
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler topicMessageSender(ServiceBusTopicOperation topicOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(TOPIC_NAME, topicOperation);
handler.setSendCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(Void result) {
System.out.println("Message was sent successfully to service bus.");
}
@Override
public void onFailure(Throwable ex) {
System.out.println("There was an error sending the message to service bus.");
}
});
return handler;
}
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface TopicOutboundGateway {
void send(String text);
}
}

你可以使用ChannelInterceptor获取消息头:

public class CustomChannelInterceptor implements ChannelInterceptor {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
//key of the message-id header is not stable, you should add logic here to check which header key should be used here.
//ref: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/spring/azure-spring-cloud-starter-servicebus#support-for-service-bus-message-headers-and-properties
String messageId = message.getHeaders().get("message-id-header-key").toString();
return ChannelInterceptor.super.preSend(message, channel);
}
}

然后在配置中,将这个拦截器设置为你的通道

@Bean(name = OUTPUT_CHANNEL)
public BroadcastCapableChannel pubSubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setInterceptors(Arrays.asList(new CustomChannelInterceptor()));
return channel;
}

相关内容

  • 没有找到相关文章

最新更新