如何根据发布者确认提供响应



我有一个web服务,它摄取对象,通过AMQP发送通知,并向请求者返回JSON响应。每个请求都在一个线程上执行,我试图实现出版商确认,我正在努力如何设置它。我让它工作,但我不喜欢我这样做。

我的做法是:

  • 给消息添加一些标题
  • 有一个包含2个订阅者的发布-订阅-通道
  • 订阅者1)创建阻塞队列,使其准备就绪并通过amqp
  • 发送消息
  • 订阅者2)开始在该队列上拉动5秒,直到它得到确认
  • amqp:outbound-channel-adapter将其发布者确认发送到服务激活器
  • publisherConfirmReceiver接收确认并将其放入阻塞队列,导致订阅者2的拉取完成并返回确认结果。

此技术确实可以正常工作,但我不喜欢假设链将在发布订阅通道的waitForPublisherConfirm服务激活器之前接收消息。在这种情况下,顺序与哪个组件首先接收消息有关。

如果waitforpublisherconfirm服务激活器首先收到消息,它将阻塞线程5秒,然后允许链通过amqp:outbound-channel-adapter发送消息。

我试着把waitforpublisherconfirm放在amqp:outbound-channel-adapter之后,但是由于outbound-channel-adapter没有"返回"任何东西,所以服务激活器在链中永远不会被调用。

我觉得应该有更好的方法来做这件事。我的目标是在向请求者发送响应之前等待发布者确认(或者在spring的发布者确认中无法找到支持的超时)。

你能帮我把解决方案设计得更好一点吗?或者让我知道是否可以依赖于这样一个事实,即发布-订阅-通道的第一个订阅者总是首先收到消息。

对不起,这封信太长了。

一些配置

<int:header-enricher input-channel="addHeaders" output-channel="metadataIngestNotifications">
    <int:header name="routingKey" ref="routingKeyResolver" method="resolveReoutingKey"/>
    <int:header name="notificationId" expression="payload.id" />
</int:header-enricher>
<int:chain input-channel="metadataIngestNotifications" output-channel="nullChannel" >
    <int:service-activator id="addPublisherConfirmQueue"
        requires-reply="false"
        ref="publisherConfirmService"  
        method="addPublisherConfirmQueue" />
    <int:object-to-json-transformer id="transformObjectToJson" />
    <int-amqp:outbound-channel-adapter id="amqpOutboundChannelAdapter"
        amqp-template="rabbitTemplate"
        exchange-name="${productNotificationExchange}"
        confirm-ack-channel="publisherConfirms"
        confirm-nack-channel="publisherConfirms"
        mapped-request-headers="*"
        routing-key-expression="headers.routingKey"
        confirm-correlation-expression="headers.notificationId" />
</int:chain>
<int:service-activator id="waitForPublisherConfirm"
        input-channel="metadataIngestNotifications"
        output-channel="publisherConfirmed"
        requires-reply="true"
        ref="publisherConfirmService"  
        method="waitForPublisherConfirm"  />

<int:service-activator id="publisherConfirmReceiver" 
                       ref="publisherConfirmService" 
                       method="receivePublisherConfirm" 
                       input-channel="publisherConfirms" 
                       output-channel="nullChannel" />

public class PublisherConfirmService {
    private final Map<String, BlockingQueue<Boolean>> suspenders = new HashMap<>();
    public Message addPublisherConfirmQueue(@Header("notificationId") String id, Message m){
        LogManager.getLogger(this.getClass()).info("Adding publisher confirm queue.");
        BlockingQueue<Boolean> bq = new LinkedBlockingQueue<>();
        suspenders.put(id, bq);
        return m;
    }
    public boolean waitForPublisherConfirm(@Header("notificationId") String id) {
        LogManager.getLogger(this.getClass()).info("Waiting for publisher confirms for Notification: " + id);
        BlockingQueue<Boolean> bq = suspenders.get(id);
        try {
            Boolean result = bq.poll(5, TimeUnit.SECONDS);
            if(result == null){
                LogManager.getLogger(this.getClass()).error("The broker took too long to return a publisher confirm. NotificationId: " + id);
                return false;
            }else if(!result){
                LogManager.getLogger(this.getClass()).error("The publisher confirm indicated that the message was not confirmed. NotificationId: " + id);
                return false;
            }
        } catch (InterruptedException ex) {
            LogManager.getLogger(this.getClass()).error("Something went wrong polling for the publisher confirm for notificationId: " + id, ex);
            return false;
        }finally{
            suspenders.remove(id);
        }
        return true;
    }
    public void receivePublisherConfirm(String id, @Header(AmqpHeaders.PUBLISH_CONFIRM) boolean confirmed){
        LogManager.getLogger(this.getClass()).info("Received publisher confirm for Notification: " + id);
        if (suspenders.containsKey(id)){
            BlockingQueue<Boolean> bq = suspenders.get(id);
            bq.add(confirmed);
        }
    } 
}

看看同样目的的聚合器解决方案如何?

<recipient-list-router>向聚合器的input-channel发送消息,<int-amqp:outbound-channel-adapter>向第二通道发送消息。

confirm-ack-channel必须是经过一些转换后将消息带到同一聚合器的东西,例如correlationKey的适当提取等。

最新更新