ActiveMQ.Advisory.Consumer.Queue topic flood using spring ca



>想知道在使用Atomikos + Camel+ ActiveMQ的组合时是否有人遇到过这个问题。我正在使用这个组合以事务处理的方式从队列中剥离消息。它似乎效果很好。问题是我现在处于需要在 ActiveMQ 中打开咨询消息的情况。完成此操作后,我注意到所有队列都在不断重新创建连接。ActiveMQ.Advisory.Consumer.Queue 主题的泛滥证明了这一点。这在 DEBUG 日志记录中也很明显,因为它会不断创建连接、打开事务、提交事务并关闭连接。这在没有任何实际应用程序生成的消息的情况下发生。所有其他非事务处理队列/主题都没有此问题。我在其他几篇文章中读到,连接池和缓存可以缓解这个问题。看来我不应该使用缓存,而且我已经在连接池了。我正在使用这个配置:

<bean id="txq" class="org.apache.activemq.camel.component.ActiveMQComponent">
    <property name="configuration" ref="txJmsConfig" />
</bean>
<bean id="txJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
    <property name="connectionFactory" ref="atomikosConnectionFactory" />
    <property name="concurrentConsumers" value="1" />
    <property name="transacted" value="true" />
    <property name="maxConcurrentConsumers" value="${consumers.concurrent.max}" />
    <property name="transactionManager" ref="jtaTransactionManager" />
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>
<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="poolSize">
        <value>4</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>
<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp"
    init-method="init" destroy-method="shutdownForce">
    <constructor-arg>
        <props>
            <prop key="com.atomikos.icatch.service">
                com.atomikos.icatch.standalone.UserTransactionServiceFactory
            </prop>
            <prop key="com.atomikos.icatch.max_actives">${batch.transactions.concurrent.max}</prop>
        </props>
    </constructor-arg>
</bean>
<bean id="atomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager"
    init-method="init" destroy-method="close" depends-on="userTransactionService">
    <property name="forceShutdown" value="false" />
</bean>
<bean id="atomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp">
    <property name="transactionTimeout" value="300" />
</bean>
<bean id="jtaTransactionManager"
    class="org.springframework.transaction.jta.JtaTransactionManager">
    <property name="transactionManager" ref="atomikosTransactionManager" />
    <property name="userTransaction" ref="atomikosUserTransaction" />
</bean>

它使用我认为实现了池化的AtomikosConnectionFactoryBean。也许我错了?我很想听听是否有其他人和我一起在这艘船上,以及他们做了什么来修复它。

@PeterSmith建议的实施

彼得,谢谢你的建议。我更改了我的配置以使用 XaPooledConnectionFactory。春天对此并不满意。它认为 XaPooledConnectionFactory 没有实现 XAConnectionFactory。

<bean id="atomikosConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean"
    init-method="init" destroy-method="close" depends-on="xaJmsPooledConnectionFactory">
    <property name="uniqueResourceName">
        <value>XA-JMS-ATOMIKOS</value>
    </property>
    <property name="localTransactionMode">
        <value>false</value>
    </property>
    <property name="maxPoolSize">
        <value>32</value>
    </property>
    <property name="xaConnectionFactory">
        <ref bean="xaJmsPooledConnectionFactory" />
    </property>
</bean>
<bean id="xaJmsPooledConnectionFactory" class="org.apache.activemq.pool.XaPooledConnectionFactory"
     init-method="start" destroy-method="stop" depends-on="xaJmsConnectionFactory">
    <property name="maxConnections" value="2" />
    <property name="connectionFactory" ref="xaJmsConnectionFactory" />
</bean> 
<bean id="xaJmsConnectionFactory" class="org.apache.activemq.ActiveMQXAConnectionFactory">
    <property name="brokerURL"
        value="${queue.address}?jms.watchTopicAdvisories=false&amp;jms.prefetchPolicy.all=0" />
</bean>

java.lang.IllegalStateException:无法将属性"xaConnectionFactory"的类型 [org.apache.activemq.pool.XaPooledConnectionFactory] 的值转换为所需的类型 [javax.jms.XAConnectionFactory]:找不到匹配的编辑器或转换策略

文档指出 XaPooledConnectionFactory 类实现了 javax.jms.XAConnectionFactory,所以我在这一点上有点迷茫。看来应该行得通。

我也使用上面的组合看到了这一点。

似乎Spring DMLC(正在实现驼峰jms消费者)正在使用consumer.receive()循环来能够在XA事务中登记读取操作。

在DMLC中没有XA缓存,甚至不使用不轮询的SMLC都是可能的。

尝试将 activemq CF 包装在 org.apache.activemq.pool.PooledConnectionFactory 中,看看是否有帮助。

最新更新