我正在配置Spring Integration,以便在我的一个通道上使用cleanSession=false。
<bean id="clientFactory" class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="cleanSession" value="false" />
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttLiveDataInbound"
client-id="client1"
url="${mqtt.broker.url}"
qos="1"
topics="liveData"
client-factory="clientFactory"
channel="channelLiveData"/>
原因是我希望能够在应用程序脱机时接收消息。当我的应用程序重新启动时,我希望它接收在我不在时发布的任何QoS>0消息。
现在我注意到了一些奇怪的事情:我的应用程序在停机后不会接收到丢失的QoS>0消息。
我记录了一个简单的场景,
- 春季集成开始
- 向主题发送QoS1消息,并由Spring Integration接收
- Spring Integration退出
- 向主题发送QoS1消息
- 春季集成开始
- Spring Integration未接收到在脱机时发送的QoS1消息
原因如下(从下面的日志中可以看出):
- 弹簧集成退出
- 它取消订阅该主题
- 它断开了客户端的连接
这本质上是告诉代理,这个客户端不再对这些消息感兴趣。当我的应用程序关闭时,代理程序不再为我保留这些QoS>0的消息。
当我的应用程序再次启动时,它无法接收在关闭时发布的QoS>0消息。
1448917620: New connection from 127.0.0.1 on port 1883.
1448917620: New client connected from 127.0.0.1 as client1 (c0, k60).
1448917620: Sending CONNACK to client1 (0, 0)
1448917620: Received SUBSCRIBE from client1
1448917620: liveData (QoS 1)
1448917620: Sending SUBACK to client1
1448917632: New connection from ::1 on port 1883.
1448917632: New client connected from ::1 as mosqpub/25936-MacBook-P (c1, k60, u'system').
1448917632: Sending CONNACK to mosqpub/25936-MacBook-P (0, 0)
1448917632: Received PUBLISH from mosqpub/25936-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917632: Sending PUBACK to mosqpub/25936-MacBook-P (Mid: 1)
1448917632: Sending PUBLISH to client1 (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917632: Received DISCONNECT from mosqpub/25936-MacBook-P
1448917632: Client mosqpub/25936-MacBook-P disconnected.
1448917633: Received PUBACK from client1 (Mid: 1)
1448917643: Received UNSUBSCRIBE from client1
1448917643: liveData
1448917643: Received DISCONNECT from client1
1448917643: Client client1 disconnected.
1448917648: New connection from ::1 on port 1883.
1448917648: New client connected from ::1 as mosqpub/25945-MacBook-P (c1, k60, u'system').
1448917648: Sending CONNACK to mosqpub/25945-MacBook-P (0, 0)
1448917648: Received PUBLISH from mosqpub/25945-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917648: Sending PUBACK to mosqpub/25945-MacBook-P (Mid: 1)
1448917648: Received DISCONNECT from mosqpub/25945-MacBook-P
1448917648: Client mosqpub/25945-MacBook-P disconnected.
1448917665: New connection from 127.0.0.1 on port 1883.
1448917665: Client client1 disconnected.
1448917665: New client connected from 127.0.0.1 as client1 (c0, k60).
1448917665: Sending CONNACK to client1 (1, 0)
1448917665: Received SUBSCRIBE from client1
1448917665: liveData (QoS 1)
1448917665: Sending SUBACK to client1
我使用mosquito客户端工具运行了这个场景,退出mosquito订阅者会断开客户端的连接,但不会取消订阅主题
1448917534: New connection from ::1 on port 1883.
1448917534: New client connected from ::1 as client1 (c0, k60).
1448917534: Sending CONNACK to client1 (0, 0)
1448917534: Received SUBSCRIBE from client1
1448917534: liveData (QoS 1)
1448917534: Sending SUBACK to client1
1448917550: New connection from ::1 on port 1883.
1448917550: New client connected from ::1 as mosqpub/25879-MacBook-P (c1, k60, u'system').
1448917550: Sending CONNACK to mosqpub/25879-MacBook-P (0, 0)
1448917550: Received PUBLISH from mosqpub/25879-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917550: Sending PUBACK to mosqpub/25879-MacBook-P (Mid: 1)
1448917550: Sending PUBLISH to client1 (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917550: Received DISCONNECT from mosqpub/25879-MacBook-P
1448917550: Client mosqpub/25879-MacBook-P disconnected.
1448917550: Received PUBACK from client1 (Mid: 1)
1448917553: Socket error on client client1, disconnecting.
1448917554: New connection from ::1 on port 1883.
1448917554: New client connected from ::1 as mosqpub/25884-MacBook-P (c1, k60, u'system').
1448917554: Sending CONNACK to mosqpub/25884-MacBook-P (0, 0)
1448917554: Received PUBLISH from mosqpub/25884-MacBook-P (d0, q1, r0, m1, 'liveData', ... (68 bytes))
1448917554: Sending PUBACK to mosqpub/25884-MacBook-P (Mid: 1)
1448917554: Received DISCONNECT from mosqpub/25884-MacBook-P
1448917555: Client mosqpub/25884-MacBook-P disconnected.
1448917556: New connection from ::1 on port 1883.
1448917556: Client client1 disconnected.
1448917556: New client connected from ::1 as client1 (c0, k60).
1448917556: Sending CONNACK to client1 (0, 0)
1448917556: Sending PUBLISH to client1 (d0, q1, r0, m2, 'liveData', ... (68 bytes))
1448917556: Received SUBSCRIBE from client1
1448917556: liveData (QoS 1)
1448917556: Sending SUBACK to client1
1448917556: Received PUBACK from client1 (Mid: 2)
知道如何处理这种情况吗?
编辑:
在实现已接受答案中提出的解决方法时,我会出现以下错误。我的Spring上下文是从Web应用程序加载的。我已经尝试将IgnoreUnsubscribePahoClientFactory放在一个单独的JAR(与spring-integration/paho级别相同)以及webapp类本身中。
2015-12-02 15:47:43,703 ERROR org.springframework.integration.handler.LoggingHandler - org.springframework.aop.framework.AopConfigException: Could not generate CGLIB subclass of class [class org.eclipse.paho.client.mqttv3.MqttAsyncClient]: Common causes of this problem include using a final class or a non-visible class; nested exception is org.springframework.cglib.core.CodeGenerationException: java.lang.reflect.InvocationTargetException-->null
at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:206)
at org.springframework.aop.framework.ProxyFactoryBean.getProxy(ProxyFactoryBean.java:368)
at org.springframework.aop.framework.ProxyFactoryBean.getSingletonInstance(ProxyFactoryBean.java:322)
at org.springframework.aop.framework.ProxyFactoryBean.getObject(ProxyFactoryBean.java:246)
at com.ecs.vdm.rest.integration.IgnoreUnsubscribePahoClientFactory.proxy(IgnoreUnsubscribePahoClientFactory.java:62)
at com.ecs.vdm.rest.integration.IgnoreUnsubscribePahoClientFactory.getAsyncClientInstance(IgnoreUnsubscribePahoClientFactory.java:43)
at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.connectAndSubscribe(MqttPahoMessageDrivenChannelAdapter.java:216)
at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter.access$300(MqttPahoMessageDrivenChannelAdapter.java:45)
at org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter$1.run(MqttPahoMessageDrivenChannelAdapter.java:272)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.cglib.core.CodeGenerationException: java.lang.reflect.InvocationTargetException-->null
at org.springframework.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:237)
at org.springframework.cglib.proxy.Enhancer.createHelper(Enhancer.java:377)
at org.springframework.cglib.proxy.Enhancer.createClass(Enhancer.java:317)
at org.springframework.aop.framework.ObjenesisCglibAopProxy.createProxyClassAndInstance(ObjenesisCglibAopProxy.java:57)
at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:202)
... 16 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor37.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.cglib.core.ReflectUtils.defineClass(ReflectUtils.java:384)
at org.springframework.cglib.core.AbstractClassGenerator.create(AbstractClassGenerator.java:219)
... 20 more
Caused by: java.lang.SecurityException: class "org.eclipse.paho.client.mqttv3.MqttAsyncClient$$EnhancerBySpringCGLIB$$d14754a9_4603"'s signer information does not match signer information of other classes in the same package
at java.lang.ClassLoader.checkCerts(ClassLoader.java:952)
at java.lang.ClassLoader.preDefineClass(ClassLoader.java:666)
at java.lang.ClassLoader.defineClass(ClassLoader.java:794)
... 25 more
这是一个错误-它在stop()
期间无条件取消订阅。
我看不到简单的工作;但我有几个想法;如果我有东西,我会在这里张贴。
在此期间,请打开一个JIRA问题。
编辑
Gist Here
这有点像大锤,但它应该对你有用;它有效地忽略了对客户端上的CCD_ 2的调用。它可以变得更复杂一点,只在QOS>0时忽略,但这会涉及更多。
如果您已经在使用DefaultMqttPahoClientFactory
,只需将bean类更改为这个类即可。如果您当前没有使用工厂,请将其声明为bean,并使用client-factory
属性将其提供给适配器。
我们将在即将发布的版本中正确修复它。