spring集成-无法使聚合器工作



我正在努力了解聚合器的基本知识。下面是我尝试实现的用例:

1) 从队列中读取消息(订单详细信息)。

<?xml version="1.0" encoding="UTF-8"?>
<order xmlns="http://www.example.org/orders">
  <orderItem>
    <isbn>12333454443</isbn>
    <quantity>4</quantity>
  </orderItem>
  <orderItem>
    <isbn>545656777</isbn>
    <quantity>50</quantity>
  </orderItem>
..
..
</order>

一个订单消息将包含多个orderItem。我们预计队列中会有数百条订单消息。

2) 最终结果::

a) 每个订单项都应该写入一个文件。

b) 4此类文件应写入一个唯一的文件夹。

举个例子,假设我们得到两条订单消息,每条消息包含三条orderitem

所以我们需要创建2个文件夹:

在"文件夹1"中,应该有4个文件(每个文件中有1个订单项

在"文件夹2"中,应该有2个文件(每个文件中有1个订单项)。为了简单起见,我们假设不再收到订单消息,并且我们可以在5分钟后进行编写。

实施:


  1. 我能够从队列(webspheremq)中读取消息,并成功地解组该消息
  2. 使用拆分器根据订单项计数拆分邮件
  3. 已使用聚合器对大小为4的邮件进行分组

我无法让聚合器按照我的理解工作。

  1. 我推送了一个订单,当4个orderitem时,消息得到了正确的聚合
  2. 我推送了一个订单和5个orderitem,前4个正在聚合,但最后一个被发送到丢弃通道。这是预期的,因为MessageGroup已释放,因此最后一条消息将被丢弃
  3. 我推送两个订单,每个订单包含2个orderitem。最后2个订单项被发送到放弃通道
    关联策略是硬编码的(OrderAggregator.java),但上面的情况应该已经起作用了

需要关于如何实现这个用例的指针,在这个用例中,我可以将它们分组为4,并写入唯一的文件夹。请注意,订单项都是独立的图书订单,它们之间没有关系。

以下是配置。

spring-ban.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans">
  <int:channel id="mqInbound"/>
  <int:channel id="item"/>
  <int:channel id="itemList"/>
  <int:channel id="aggregatorDiscardChannel"/>
  <int-jms:message-driven-channel-adapter id="jmsIn"
                                      channel="mqInbound"
                                      destination="requestQueue" 
                                      message-   converter="orderMessageConverter"/>
  <int:splitter input-channel="mqInbound"  output-channel="item" expression="payload.orderItem"/>
  <int:chain id="aggregateList" input-channel="item" output-channel="itemList"  >
    <int:header-enricher>
      <int:header name="sequenceSize" expression="4" overwrite="true"/>
    </int:header-enricher>
    <int:aggregator  correlation-strategy="orderAggregator" correlation-strategy-method="groupOrders"  discard-channel="aggregatorDiscardChannel" />
  </int:chain>
  <int:service-activator input-channel="itemList"                 ref="displayAggregatedList" method="display"/>
  <int:service-activator input-channel="aggregatorDiscardChannel" ref="displayAggregatedList" method="displayDiscarded"/>
  <bean id="orderAggregator"       class="com.samples.Aggregator.OrderAggregator"/>
  <bean id="displayAggregatedList" class="com.samples.Aggregator.DisplayAggregatedList"/>
  ...
  ....
</beans>

订单聚合器.java

public class OrderAggregator {
@Aggregator
public List<OrderItemType> sendList(List<OrderItemType> orderItemTypeList) {
    return orderItemTypeList;
}
@CorrelationStrategy
public String groupOrders( OrderItemType orderItemType) {
    return "items";
}
}

显示聚合列表.java

public class DisplayAggregatedList {
public void display(List <OrderItemType> orderItemTypeList) {
    System.out.println("######## Display Aggregated ##############");
    for(OrderItemType oit : orderItemTypeList) {
        System.out.println("### Isbn :" + oit.getIsbn() + ":: Quantity :" + oit.getQuantity());
    }
}
public void displayDiscarded(Message<?> message) {
    System.out.println("######## Display Discarded ##############" + message);
}
}   

您需要的是expire-groups-upon-completion:

当设置为true(默认为false)时,已完成的组将从消息存储中删除,从而允许具有相同相关性的后续消息组成新组。默认行为是将具有与已完成组相同相关性的消息发送到丢弃通道。

如果您仍然需要发布未完成的组(例如,还剩2个订单),请考虑使用group-timeout:http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-并分组到

请在完成时使用过期组="true",并考虑使用MessageCountReleaseStrategy `作为发布策略–Artem Bilan

相关内容

  • 没有找到相关文章

最新更新