Apache ActiveMq Artemis客户端重新连接到集群HA复制/共享数据存储中的下一个可用代理



Broker.xml (host1( 和 host2 只是端口号更改为 61616 并从属作为配置。参考 Apache Artemis 客户端故障转移发现

<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">
<core xmlns="urn:activemq:core">
<bindings-directory>./data/bindings</bindings-directory>
<journal-directory>./data/journal</journal-directory>
<large-messages-directory>./data/largemessages</large-messages-directory>
<paging-directory>./data/paging</paging-directory>
<!-- Connectors -->
<connectors>
<connector name="netty-connector">tcp://10.64.60.100:61617</connector><!-- direct ip addres of host myhost1 -->
<connector name="broker2-connector">tcp://myhost2:61616</connector> <!-- ip 10.64.60.101 <- mocked up ip for security reasons -->
</connectors>
<!-- Acceptors -->
<acceptors>
<acceptor name="amqp">tcp://0.0.0.0:61617?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP;useEpoll=true</acceptor>
</acceptors>
<ha-policy>
<replication>
<master/>
</replication>
</ha-policy>
<cluster-connections>
<cluster-connection name="myhost1-cluster">
<connector-ref>netty-connector</connector-ref>
<retry-interval>500</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<message-load-balancing>ON_DEMAND</message-load-balancing>
<max-hops>1</max-hops>
<static-connectors>
<connector-ref>broker2-connector</connector-ref> <!-- defined in the connectors -->
</static-connectors>
</cluster-connection>
</cluster-connections>
<security-settings>
<security-setting match="#">
<permission type="createNonDurableQueue" roles="amq"/>
<permission type="deleteNonDurableQueue" roles="amq"/>
<permission type="createDurableQueue" roles="amq"/>
<permission type="deleteDurableQueue" roles="amq"/>
<permission type="createAddress" roles="amq"/>
<permission type="deleteAddress" roles="amq"/>
<permission type="consume" roles="amq"/>
<permission type="browse" roles="amq"/>
<permission type="send" roles="amq"/>
<permission type="manage" roles="amq"/>
</security-setting>
</security-settings>
<address-settings>
<!--default for catch all-->
<address-setting match="#">
<dead-letter-address>DLQ</dead-letter-address>
<expiry-address>ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<!-- with -1 only the global-max-size is in use for limiting -->
<max-size-bytes>-1</max-size-bytes>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
<address-full-policy>PAGE</address-full-policy>
<auto-create-queues>true</auto-create-queues>
<auto-create-addresses>true</auto-create-addresses>
<auto-delete-queues>false</auto-delete-queues>
<auto-delete-created-queues>false</auto-delete-created-queues>
<auto-delete-addresses>false</auto-delete-addresses>
</address-setting>
</address-settings>
</core>
</configuration>

使用 Java 和生产者模板将消息推送到直接端点,然后路由到队列的客户端生成消息:

Java-camel-producer-client

package com.demo.artemis;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.spring.SpringCamelContext;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ToRoute {
public static void main(String[] args) throws Exception { 
ApplicationContext appContext = new ClassPathXmlApplicationContext(
"camel-context-producer.xml");
ProducerTemplate template = null;
CamelContext camelContext = SpringCamelContext.springCamelContext(
appContext, false);
try {            
camelContext.start();
template = camelContext.createProducerTemplate();
String msg = null;
int loop =0;
int i = 0;
while(true) {
if (i%10000==0) {
i=1;
loop=loop+1;
}
if(loop==2) break;
msg ="---> "+(++i);             
template.sendBody("direct:toDWQueue", msg);
}
} finally {
if (template != null) {
template.stop();
}
camelContext.stop();
}
}
}

将消息发送到队列的驼峰上下文: 说camel-producer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:aop="http://www.springframework.org/schema/aop" 
xmlns:context="http://www.springframework.org/schema/context" 
xmlns:jee="http://www.springframework.org/schema/jee" 
xmlns:tx="http://www.springframework.org/schema/tx" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop 
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
<constructor-arg index="0" value="(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;"/>
</bean>
<bean id="jmsPooledConnectionFactory" class="org.messaginghub.pooled.jms.JmsPoolConnectionFactory" init-method="start" destroy-method="stop">
<property name="maxConnections" value="10" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsPooledConnectionFactory" />
<property name="concurrentConsumers" value="5" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
<property name="streamMessageTypeEnabled" value="true"/>
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="myqueue" uri="jms:queue:myExampleQueue" />
<route>
<from uri="direct:toMyExample"/>
<transform>
<simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}</simple>
</transform>
<to uri="ref:myqueue"/>
</route>
</camelContext>
</beans>

从 MyExampleQueue 读取并转换为 OutboundQueue 的 Camel 消费者:(此上下文文件使用 camel "org.apache.camel.spring.main –ac 启动camel-consumer-client

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:aop="http://www.springframework.org/schema/aop" 
xmlns:context="http://www.springframework.org/schema/context" 
xmlns:jee="http://www.springframework.org/schema/jee" 
xmlns:tx="http://www.springframework.org/schema/tx" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop 
http://www.springframework.org/schema/aop/spring-aop-3.1.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.1.xsd
http://www.springframework.org/schema/jee
http://www.springframework.org/schema/jee/spring-jee-3.1.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.1.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory</prop>
<prop key="connectionFactory.ConnectionFactory">(tcp://myhost1:61617,tcp://myhost2:61616)?ha=true;retryInterval=1000;retryIntervalMultiplier=1.0;reconnectAttempts=-1;</prop>
<prop key="queue.queue/MyExampleQueue">MyExampleQueue</prop>
<prop key="queue.queue/OutBoundQueue">OutBoundQueue</prop>
</props>
</property>
</bean>
<bean id="jndiFactoryBean" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="ConnectionFactory"/> 
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>     
</bean> 
<bean id="jndiDestinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">
<property name="jndiTemplate" ref="jndiTemplate"/>
<property name="cache" value="true"/>
<!-- dynamic destination if the destination name  is not found in JNDI -->
<property name="fallbackToDynamicDestination" value="true"/>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jndiFactoryBean"/>
<property name="destinationResolver" ref="jndiDestinationResolver"/>
<property name="concurrentConsumers" value="10" />
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="configuration" ref="jmsConfig" />
</bean>
<camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
<endpoint id="inqueue" uri="jms:queue:MyExampleQueue" />
<endpoint id="outqueue" uri="jms:queue:OutBoundQueue" />
<route>
<from uri="ref:inqueue" />
<convertBodyTo type="java.lang.String" />
<transform>
<simple>MSG FRM MyExampleQueue TO OutboundQueue : ${bodyAs(String)}</simple>
</transform>
<to uri="ref:outqueue" />
</route>
</camelContext>
</beans>

我认为我面临的问题与这张票相似 http://activemq.2283324.n4.nabble.com/Connection-to-the-backup-after-master-shutdown-td4753770.html

使用 Camel 上下文时,我看到当主服务器不可用时,客户端使用者被重定向到从属服务器(我正在使用带有 JMSPoolConnectionFactory 的 JNDI(。

Java-producer-client使用生产者模板将消息发送到指示终结点,该终结点被转换并路由到队列。当主服务器关闭时,客户端无法故障转移并连接到从属服务器。(问题:这是代理处于 HA 模式时预期的行为吗(。

重新连接尝试后,当主站关闭时,我看到以下异常。

Caused by: javax.jms.JMSException: AMQ219016: Connection failure detected. Unblocking a blocking call that will never get a response

另一方面,使用 Main 启动的camel-context-consumer能够自动故障转移到从属服务器。在控制台中,我确实注意到从属主机和处理数据中可以使用消费者计数 10。但是,当主节点备份时,客户机没有切换到实时主节点。问:这也是意料之中的吗?

为了创建连接工厂,使用了以下约定。

(tcp://myhost:61616,tcp://myhost1:61617)?ha=true;reconnectAttempts=-1;

问:我们是否需要配置传送和发现组,即使使用静态连接器也是如此?

如果故障转移发生在客户端处于阻塞调用的过程中(例如,发送持久消息并等待来自代理的确认、提交事务等(,则会出现错误,指出"取消阻止永远不会得到响应的阻塞调用"。这将在文档中进一步讨论。

考虑到您的配置,客户端在返回主代理时不会切换回主代理这一事实也是意料之中的。简而言之,您尚未正确配置故障回复。您的主人应该具备:

<ha-policy>
<replication>
<master>
<check-for-live-server>true</check-for-live-server>
</master>
</replication>
</ha-policy>

你的奴隶应该有:

<ha-policy>
<replication>
<slave>
<allow-failback>true</allow-failback>
</slave>
</replication>
</ha-policy>

文档中也对此进行了讨论。

最后,使用静态连接器时,无需配置广播组和发现组。

在生产者中处理 JMSException 后(也能够成功故障转移(

<!--added this handler-->
<bean id="deadLetterErrorHandler"
class="org.apache.camel.builder.DeadLetterChannelBuilder">
<property name="deadLetterUri" value="log:dead" />
</bean>
<!-- referred the handler -->
<route errorHandlerRef="deadLetterErrorHandler">
<from uri="direct:toMyQueue" />
<transform>
<simple>MSG FRM DIRECT TO MyExampleQueue : ${bodyAs(String)}
</simple>
</transform>
<to uri="ref:myqueue" />
</route>

启动两个虚拟机并使用 172.28.128.28(主(和 172.28.128.100(从属/备份( 从下面的日志中,当主服务器关闭时,从属服务器在代理端启动,客户端故障转移日志如下。

2020-06-06 07:26:11 DEBUG NettyConnector:1259 - NettyConnector [host=172.28.128.28, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] host 1: 172.28.128.100 ip address: 172.28.128.100 host 2: 172.28.128.28 ip address: 172.28.128.28
2020-06-06 07:26:11 DEBUG ClientSessionFactoryImpl:272 - Setting up backup config = TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=172-28-128-100 for live = TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=172-28-128-28
2020-06-06 07:26:11 DEBUG JmsConfiguration$CamelJmsTemplate:502 - Executing callback on JMS Session: JmsPoolSession { ActiveMQSession->ClientSessionImpl [name=ab4faacf-a801-11ea-9c64-0a002700000c, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@7b18658a, metaData=(jms-session=,)]@1118d539 }
2020-06-06 07:26:11 DEBUG JmsConfiguration:621 - Sending JMS message to: ActiveMQQueue[myExampleQueue] with message: ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[]]
2020-06-06 07:26:11 DEBUG DefaultProducerCache:169 - >>>> direct://toMyQueue Exchange[]
2020-06-06 07:26:11 DEBUG SendProcessor:167 - >>>> ref://myqueue Exchange[]
...
2020-06-06 07:27:15 DEBUG ClientSessionFactoryImpl:800 - Trying reconnection attempt 0/1
2020-06-06 07:27:15 DEBUG ClientSessionFactoryImpl:1102 - Trying to connect with connectorFactory = org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory@2fd9fb34, connectorConfig=TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=172-28-128-100
2020-06-06 07:27:15 DEBUG NettyConnector:508 - Connector + NettyConnector [host=172.28.128.100, port=61616, httpEnabled=false, httpUpgradeEnabled=false, useServlet=false, servletPath=/messaging/ActiveMQServlet, sslEnabled=false, useNio=true] using nio
2020-06-06 07:27:15 DEBUG client:670 - AMQ211002: Started NIO Netty Connector version 4.1.48.Final to 172.28.128.100:61616
2020-06-06 07:27:15 DEBUG NettyConnector:805 - Remote destination: /172.28.128.100:61616
2020-06-06 07:27:15 DEBUG NettyConnector:661 - Added ActiveMQClientChannelHandler to Channel with id = 62e089d3 
2020-06-06 07:27:15 DEBUG ClientSessionFactoryImpl:809 - Reconnection successful
2020-06-06 07:27:15 DEBUG ClientSessionFactoryImpl:277 - ClientSessionFactoryImpl received backup update for live/backup pair = TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=172-28-128-100 / null but it didn't belong to TransportConfiguration(name=netty-connector, factory=org-apache-activemq-artemis-core-remoting-impl-netty-NettyConnectorFactory) ?port=61616&host=172-28-128-100
2020-06-06 07:27:15 DEBUG JmsConfiguration$CamelJmsTemplate:502 - Executing callback on JMS Session: JmsPoolSession { ActiveMQSession->ClientSessionImpl [name=d200c1a0-a801-11ea-9c64-0a002700000c, username=null, closed=false, factory = org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl@12abcd1e, metaData=(jms-session=,)]@33d28f0a }
2020-06-06 07:27:16 DEBUG JmsConfiguration:621 - Sending JMS message to: ActiveMQQueue[myExampleQueue] with message: ActiveMQMessage[null]:PERSISTENT/ClientMessageImpl[messageID=0, durable=true, address=null,userID=null,properties=TypedProperties[]]
2020-06-06 07:27:16 DEBUG DefaultProducerCache:169 - >>>> direct://toMyQueue Exchange[]
2020-06-06 07:27:16 DEBUG SendProcessor:167 - >>>> ref://myqueue Exchange[]

当客户端(消费者(在代理(队列到队列路由(或代理(队列(中使用骆驼路由来弹簧 bean 时,JMS 异常不会发生,但处理和重新交付这些异常会很有帮助。

相关内容

  • 没有找到相关文章

最新更新