影响其他消息的 JmsTransactionManager



我正在尝试使客户端确认模式在JMS中工作。我目前有一个在自动确认中工作的队列,我不知道还需要做些什么才能使其成为客户端确认。

执行时,我没有收到任何错误。它似乎只是在对其他消息(日志中的不同 id)做某事。

应用程序上下文.xml

<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <constructor-arg index="0" value="tcp://localhost:61616" />
    </bean>
<bean id="connectionFactory"
    class="org.springframework.jms.connection.CachingConnectionFactory">
    <constructor-arg ref="amqConnectionFactory" />
</bean>
<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
    <!-- name of the queue -->
    <constructor-arg index="0" value="MyQueue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="defaultDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory">
        <ref bean="amqConnectionFactory"/>
    </property>
</bean>

QueueJMSProxy.java

private JmsTemplate jmsTemplate;
@Autowired
private JmsTransactionManager jmsTransactionManager;
@Autowired
public QueueJMSProxy(JmsTemplate jmsTemplate) {
    this.jmsTemplate = jmsTemplate;
    jmsTemplate.setReceiveTimeout(BAGSchedulerCTE.RECEIVE_JMS_TIMEOUT);
}
public synchronized MyMessage receiveMessageFromBSG() {
    //1
    TransactionStatus status = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition());
    //2
    Message receivedMessage = jmsTemplate.receive();
    MyMessage myMessage = saveInDB(receivedMessage);

    //3
    jmsTransactionManager.rollback(status);
    return myMessage;
}

1、2 和 3 的日志

//1
2017-01-27_11:32:36,209 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat  | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,219 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.InactivityMonitor:92->configuredOk  | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,224 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:118->negociate  | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:125->negociate  | tcp://localhost/127.0.0.1:61616@60606 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:140->negociate  | tcp://localhost/127.0.0.1:61616@60606 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
//2
2017-01-27_11:32:40,851 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat  | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,852  INFO main | connection.CachingConnectionFactory:311->initConnection  | Established shared JMS Connection: ActiveMQConnection {id=ID:LATES-0008-60605-1485513156084-1:2,clientId=null,started=false}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.InactivityMonitor:92->configuredOk  | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:118->negociate  | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:125->negociate  | tcp://localhost/127.0.0.1:61616@60607 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:40,857 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:140->negociate  | tcp://localhost/127.0.0.1:61616@60607 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
2017-01-27_11:32:40,923 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | thread.TaskRunnerFactory:91->init  | Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@799e037[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:40,929 DEBUG main | activemq.TransactionContext:248->begin  | Begin:TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,940 DEBUG main | activemq.ActiveMQSession:572->commit  | ID:LATES-0008-60605-1485513156084-1:2:1 Transaction Commit :TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,941 DEBUG main | activemq.TransactionContext:317->commit  | Commit: TX:ID:LATES-0008-60605-1485513156084-1:2:1 syncCount: 1
//3
2017-01-27_11:32:47,026 DEBUG main | activemq.ActiveMQSession:593->rollback  | ID:LATES-0008-60605-1485513156084-1:1:1 Transaction Rollback, txid:null
2017-01-27_11:32:47,069 DEBUG main | util.ThreadPoolUtils:136->doShutdown  | Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2a6a9ba5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
2017-01-27_11:32:47,071 DEBUG main | tcp.TcpTransport:525->doStop  | Stopping transport tcp://localhost/127.0.0.1:61616@60606
2017-01-27_11:32:47,074 DEBUG main | thread.TaskRunnerFactory:91->init  | Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:47,076 DEBUG ActiveMQ Task-1 | tcp.TcpTransport:543->run  | Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=60606]
2017-01-27_11:32:47,076 DEBUG main | util.ThreadPoolUtils:54->shutdownNow  | Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
我不

明白这个问题,你能发布你的日志吗,最好像这样设置这些属性:

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="defaultDestination" ref="defaultDestination" />
    <property name="sessionTransacted" value="true" />
    <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>

更新

使用 CachingConnectionFactory 作为此事务的目标 强烈建议经理。CachingConnectionFactory 使用 所有 JMS 访问的单个 JMS 连接,以避免 重复创建连接的开销,以及维护 会话的缓存。然后,每个事务将共享相同的 JMS 连接,同时仍使用其自己的单个 JMS 会话。

http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/connection/JmsTransactionManager.html

像这样的JmsTransactionManager.connectionFactory

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory">
        <ref bean="connectionFactory"/>
    </property>
</bean>

相关内容

  • 没有找到相关文章

最新更新