我正在尝试使客户端确认模式在JMS中工作。我目前有一个在自动确认中工作的队列,我不知道还需要做些什么才能使其成为客户端确认。
执行时,我没有收到任何错误。它似乎只是在对其他消息(日志中的不同 id)做某事。
应用程序上下文.xml
<bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<constructor-arg index="0" value="tcp://localhost:61616" />
</bean>
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
</bean>
<bean id="defaultDestination" class="org.apache.activemq.command.ActiveMQQueue">
<!-- name of the queue -->
<constructor-arg index="0" value="MyQueue" />
</bean>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="defaultDestination" />
<property name="sessionTransacted" value="true" />
<property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" />
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory">
<ref bean="amqConnectionFactory"/>
</property>
</bean>
QueueJMSProxy.java
private JmsTemplate jmsTemplate;
@Autowired
private JmsTransactionManager jmsTransactionManager;
@Autowired
public QueueJMSProxy(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
jmsTemplate.setReceiveTimeout(BAGSchedulerCTE.RECEIVE_JMS_TIMEOUT);
}
public synchronized MyMessage receiveMessageFromBSG() {
//1
TransactionStatus status = jmsTransactionManager.getTransaction(new DefaultTransactionDefinition());
//2
Message receivedMessage = jmsTemplate.receive();
MyMessage myMessage = saveInDB(receivedMessage);
//3
jmsTransactionManager.rollback(status);
return myMessage;
}
1、2 和 3 的日志
//1
2017-01-27_11:32:36,209 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,219 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.InactivityMonitor:92->configuredOk | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,224 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:118->negociate | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:125->negociate | tcp://localhost/127.0.0.1:61616@60606 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:36,226 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60606 | transport.WireFormatNegotiator:140->negociate | tcp://localhost/127.0.0.1:61616@60606 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
//2
2017-01-27_11:32:40,851 DEBUG main | transport.WireFormatNegotiator:82->sendWireFormat | Sending: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,852 INFO main | connection.CachingConnectionFactory:311->initConnection | Established shared JMS Connection: ActiveMQConnection {id=ID:LATES-0008-60605-1485513156084-1:2,clientId=null,started=false}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.InactivityMonitor:92->configuredOk | Using min of local: WireFormatInfo { version=10, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:118->negociate | Received WireFormat: WireFormatInfo { version=10, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
2017-01-27_11:32:40,856 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:125->negociate | tcp://localhost/127.0.0.1:61616@60607 before negotiation: OpenWireFormat{version=10, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2017-01-27_11:32:40,857 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | transport.WireFormatNegotiator:140->negociate | tcp://localhost/127.0.0.1:61616@60607 after negotiation: OpenWireFormat{version=10, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
2017-01-27_11:32:40,923 DEBUG ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@60607 | thread.TaskRunnerFactory:91->init | Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@799e037[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:40,929 DEBUG main | activemq.TransactionContext:248->begin | Begin:TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,940 DEBUG main | activemq.ActiveMQSession:572->commit | ID:LATES-0008-60605-1485513156084-1:2:1 Transaction Commit :TX:ID:LATES-0008-60605-1485513156084-1:2:1
2017-01-27_11:32:40,941 DEBUG main | activemq.TransactionContext:317->commit | Commit: TX:ID:LATES-0008-60605-1485513156084-1:2:1 syncCount: 1
//3
2017-01-27_11:32:47,026 DEBUG main | activemq.ActiveMQSession:593->rollback | ID:LATES-0008-60605-1485513156084-1:1:1 Transaction Rollback, txid:null
2017-01-27_11:32:47,069 DEBUG main | util.ThreadPoolUtils:136->doShutdown | Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@2a6a9ba5[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.001 seconds.
2017-01-27_11:32:47,071 DEBUG main | tcp.TcpTransport:525->doStop | Stopping transport tcp://localhost/127.0.0.1:61616@60606
2017-01-27_11:32:47,074 DEBUG main | thread.TaskRunnerFactory:91->init | Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
2017-01-27_11:32:47,076 DEBUG ActiveMQ Task-1 | tcp.TcpTransport:543->run | Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=60606]
2017-01-27_11:32:47,076 DEBUG main | util.ThreadPoolUtils:54->shutdownNow | Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@316ed8bb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
明白这个问题,你能发布你的日志吗,最好像这样设置这些属性:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="defaultDestination" /> <property name="sessionTransacted" value="true" /> <property name="sessionAcknowledgeModeName" value="CLIENT_ACKNOWLEDGE" /> </bean>
更新
使用 CachingConnectionFactory 作为此事务的目标 强烈建议经理。CachingConnectionFactory 使用 所有 JMS 访问的单个 JMS 连接,以避免 重复创建连接的开销,以及维护 会话的缓存。然后,每个事务将共享相同的 JMS 连接,同时仍使用其自己的单个 JMS 会话。
http://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/connection/JmsTransactionManager.html
像这样的JmsTransactionManager.connectionFactory
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> <property name="connectionFactory"> <ref bean="connectionFactory"/> </property> </bean>