BlazeDS+ActiveMQ:Flex客户端与持久主题的非优雅断开连接不会将其从ActiveMQ中删除



我正试图让一个基于Flex的桌面应用程序使用BlazeDS的JMS桥,使用持久订阅来使用来自ActiveMQ主题的消息。基本场景如下:

  1. 消息由Flex客户端订阅的主题中的其他生产者生成。

  2. Flex客户端可能会不时离线,但当它再次连接到BlazeDS时,它必须接收离线时错过的所有消息。(当然,Flex客户端每次都使用相同的客户端ID进行连接(。

  3. 不能保证Flex客户端正常关闭。

如果我通过调用disconnect()显式地断开Flex端的消费者的连接,一切都会很好——我在应用程序的出口处理程序中这样做。然而,由于上面的#3,不能保证disconnect()一直被调用。当Flex客户端在未调用disconnect()的情况下关闭时,BlazeDS创建并关联到Flex客户端的"代理JMS客户端"的订阅似乎对ActiveMQ保持活动状态,因此ActiveMQ仍然认为客户端已登录。当Flex应用程序下次启动时,由于ActiveMQ拒绝订阅,它无法登录BlazeDS,声称客户端ID已经被占用。为什么会这样?我在这里可以做些什么来确保BlazeDS在其真正的Flex对等端意外终止时使ActiveMQ中的"代理JMS客户端"离线?

更多详细信息:一些调试显示:

  1. BlazeDS会意识到Flex客户端的终止,因为它在调试模式下会向控制台打印一些异常。消息如下:

    [BlazeDS]23:18:13.688 [WARN] Endpoint with id 'my-streaming-amf' is closing the streaming connection to FlexClient with id '71E6466F-D91F-201C-F60A-A6CB52F95D9F' because endpoint encountered a socket write error, possibly due to an unresponsive FlexClient.
    ClientAbortException:  java.net.SocketException: Broken pipe
        at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:319)
        at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
        at org.apache.catalina.connector.Response.flushBuffer(Response.java:542)
        at org.apache.catalina.connector.ResponseFacade.flushBuffer(ResponseFacade.java:279)
        at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.handleFlexClientStreamingOpenRequest(BaseStreamingHTTPEndpoint.java:818)
        at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.serviceStreamingRequest(BaseStreamingHTTPEndpoint.java:1055)
        at flex.messaging.endpoints.BaseStreamingHTTPEndpoint.service(BaseStreamingHTTPEndpoint.java:460)
        at flex.messaging.MessageBrokerServlet.service(MessageBrokerServlet.java:353)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
        at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
        at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
        at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
        at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:175)
        at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:128)
        at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
        at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
        at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:263)
        at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:844)
        at org.apache.coyote.http11.Http11Protocol$Http11ConnectionHandler.process(Http11Protocol.java:584)
        at org.apache.tomcat.util.net.JIoEndpoint$Worker.run(JIoEndpoint.java:447)
        at java.lang.Thread.run(Thread.java:680)
    Caused by: java.net.SocketException: Broken pipe
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:136)
        at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:737)
        at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434)
        at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299)
        at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:963)
        at org.apache.coyote.Response.action(Response.java:183)
        at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314)
        ... 20 more
    [BlazeDS]23:18:13.689 [DEBUG] Streaming thread 'http-8400-1' for endpoint with id 'my-streaming-amf' is releasing connection and returning to the request handler pool.
    [BlazeDS]23:18:13.689 [INFO] Number of streaming clients for FlexSession with id '5BC5E8D604A361BCA673B05AC624CCC1' is 0.
    [BlazeDS]23:18:13.689 [DEBUG] Number of streaming clients for endpoint with id 'my-streaming-amf' is 0.
    

    在这个阶段,订阅仍然在ActiveMQ web管理界面上显示为活动。

  2. 从控制台用kill -9杀死BlazeDS(更准确地说,是托管它的Tomcat服务器(,使ActiveMQ立即意识到"代理JMS客户端"已经不存在,它在ActiveMQ web管理界面上变得离线。这让我得出结论,BlazeDS显式地保持了代理JMS客户端的活动状态,因为kill -9没有给BlazeDS取消订阅客户端的机会,但它在ActiveMQ中仍然处于脱机状态。

因此,问题再次出现:当BlazeDS的真正Flex客户端意外终止时,我能做些什么来确保它在ActiveMQ中使"代理JMS客户端"离线?这是BlazeDS中的一个错误,还是我只是缺少了一些可以让它工作的隐藏配置设置?

版本信息:BlazeDS-4.0、ActiveMQ 5.5.0,均为今日新下载。我在BlazeDS交钥匙中使用Tomcat服务器,但ActiveMQ是单独安装的,因为BlazeDS的交钥匙只附带了ActiveMQ 4.1.1。顺便说一句,那个版本的ActiveMQ也有同样的问题。

问题是BlazeDS无法检测到您的Flex客户端已关闭,您必须实现自己的机制-我的建议是使用消息传递实现的心跳。如果在一段时间后没有收到来自客户端的消息,您可以假设Flex客户端已经离开并进行断开连接(或者您可以在服务器上使用会话超时机制,并在会话过期时进行断开(。

你所看到的(流媒体频道关闭时捕捉到的异常(不足以100%确定Flex客户端已经消失。流式传输是使用永久打开的HTTP连接(用于发送服务器消息(和定期的HTTP后调用(由客户端启动以发送消息(来实现的。在某些网络中,防火墙可能会在几秒钟后决定终止HTTP连接,你会收到与你发布的错误相同的错误。然而,这并不意味着Flex客户端被杀死——在这种情况下,Flex客户端可以使用回退策略并切换到短/长轮询。事实上,如果BlazeDS在这种情况下自动断开JMS连接,那将是一个错误。

最新更新