如何避免数据随着客户端连接数量的增加而增加两倍或三倍



我使用blazeDs发布/订阅方法和flex_sdk_4.1来开发一个flex应用程序,该应用程序显示使用esper生成的实时事件的数据网格和图表。用actionscript编写的messageHandler()检索通过JMS传递的事件。当我在Tomcat Server中运行这个应用程序时,它对于一个连接运行得很好,但是,如果同时增加客户机连接的数量,则事件会根据连接的数量增加一倍或三倍。我需要显示数据而不重复。谁能帮我修复这个bug ?提前感谢。以下代码是flex配置文件,供您参考。

Messaging-config.xml :

<?xml version="1.0" encoding="UTF-8"?>
<service id="message-service" 
    class="flex.messaging.services.MessageService">
    <adapters>
        <adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" />
        <!-- <adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter"/> -->
    </adapters>
    <default-channels>
        <channel ref="my-longpolling-amf"/>
    </default-channels>
   <destination id="sensordata">
    <properties>    
      <server>
         <allow-subtopics>true</allow-subtopics>    
      </server>
    </properties>
   </destination>
</service>

Remoting-config.xml :

<?xml version="1.0" encoding="UTF-8"?>
<service id="remoting-service" 
    class="flex.messaging.services.RemotingService">
    <adapters>
        <adapter-definition id="java-object" class="flex.messaging.services.remoting.adapters.JavaAdapter" default="true"/>
    </adapters>
    <default-channels>
        <channel ref="my-amf"/>
    </default-channels>
<destination id="Esperjms">
    <properties>
    <source>jmsesper.flexserver.jmsConsumer.jmsConsumer</source>
    </properties>       
</destination>

services.config.xml :

<?xml version="1.0" encoding="UTF-8"?>
<services-config>
    <services>
        <service-include file-path="remoting-config.xml" />
        <service-include file-path="proxy-config.xml" />    
        <service-include file-path="messaging-config.xml" />        
    </services>
    <security>
        <login-command class="flex.messaging.security.TomcatLoginCommand" server="Tomcat"/>
        <!-- Uncomment the correct app server
        <login-command class="flex.messaging.security.TomcatLoginCommand" server="JBoss">
        <login-command class="flex.messaging.security.JRunLoginCommand" server="JRun"/>        
        <login-command class="flex.messaging.security.WeblogicLoginCommand" server="Weblogic"/>
        <login-command class="flex.messaging.security.WebSphereLoginCommand" server="WebSphere"/>
        -->
        <!-- 
        <security-constraint id="basic-read-access">
            <auth-method>Basic</auth-method>
            <roles>
                <role>guests</role>
                <role>accountants</role>
                <role>employees</role>
                <role>managers</role>
            </roles>
        </security-constraint>
         -->
    </security>
    <channels>
     <channel-definition id="my-amf" class="mx.messaging.channels.AMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amf" class="flex.messaging.endpoints.AMFEndpoint"/>  
        </channel-definition>
        <channel-definition id="my-secure-amf" class="mx.messaging.channels.SecureAMFChannel">
            <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/amfsecure" class="flex.messaging.endpoints.SecureAMFEndpoint"/>
            <properties>
                <add-no-cache-headers>false</add-no-cache-headers>
            </properties>
        </channel-definition>   
        <channel-definition id="my-polling-amf" class="mx.messaging.channels.AMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amfpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
            <properties>
                <polling-enabled>true</polling-enabled>
                <polling-interval-seconds>1</polling-interval-seconds>
            </properties>
        </channel-definition>
    <channel-definition id="my-streaming-amf" class="mx.messaging.channels.StreamingAMFChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/streamingamf" class="flex.messaging.endpoints.StreamingAMFEndpoint"/>            
        <properties>            
        <user-agent-settings>
                    <user-agent match-on="MSIE" kickstart-bytes="2048" max-streaming-connections-per-session="1"/>
                    <user-agent match-on="Firefox" kickstart-bytes="0" max-streaming-connections-per-session="10"/>
            <user-agent match-on="Chrome" max-streaming-connections-per-session="5"/>
                </user-agent-settings>
        </properties>
        </channel-definition>
    <channel-definition id="my-longpolling-amf" class="mx.messaging.channels.AMFChannel">
        <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/amflongpolling" class="flex.messaging.endpoints.AMFEndpoint"/>
        <properties>
            <polling-enabled>true</polling-enabled>
            <polling-interval-seconds>5</polling-interval-seconds>
            <wait-interval-millis>60000</wait-interval-millis>
            <client-wait-interval-millis>1</client-wait-interval-millis>
            <max-waiting-poll-requests>200</max-waiting-poll-requests>
                    <user-agent-settings>
                <user-agent match-on="Chrome" max-streaming-connections-per-session="5"/>
                    </user-agent-settings>
        </properties>
    </channel-definition>   
        <!--
        <channel-definition id="my-http" class="mx.messaging.channels.HTTPChannel">
            <endpoint url="http://{server.name}:{server.port}/{context.root}/messagebroker/http" class="flex.messaging.endpoints.HTTPEndpoint"/>
        </channel-definition>
        <channel-definition id="my-secure-http" class="mx.messaging.channels.SecureHTTPChannel">
            <endpoint url="https://{server.name}:{server.port}/{context.root}/messagebroker/httpsecure" class="flex.messaging.endpoints.SecureHTTPEndpoint"/>
            <properties>
                <add-no-cache-headers>false</add-no-cache-headers>
            </properties>
        </channel-definition>
        -->
    </channels>
    <logging>
        <target class="flex.messaging.log.ConsoleTarget" level="Error">
            <properties>
                <prefix>[BlazeDS] </prefix>
                <includeDate>false</includeDate>
                <includeTime>false</includeTime>
                <includeLevel>false</includeLevel>
                <includeCategory>false</includeCategory>
            </properties>
            <filters>
                <pattern>Endpoint.*</pattern>
                <pattern>Service.*</pattern>
                <pattern>Configuration</pattern>
            </filters>
        </target>
    </logging>
    <system>
        <redeploy>
            <enabled>false</enabled>
            <!-- 
            <watch-interval>20</watch-interval>
            <watch-file>{context.root}/WEB-INF/flex/services-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/proxy-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/remoting-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/messaging-config.xml</watch-file>
            <watch-file>{context.root}/WEB-INF/flex/data-management-config.xml</watch-file>
            <touch-file>{context.root}/WEB-INF/web.xml</touch-file>
             -->
        </redeploy>
    </system>
</services-config>

应用程序集成了ActiveMQ,它实现了发布/订阅方法。在现有的场景中,会话是为'topic'创建的,该会话根据使用者启用的连接数增加一倍或三倍。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        javax.jms.Connection connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Destination represents here our queue 'TESTQUEUE' 
        Destination destination = session.**createTopic("TESTQUEUE")**;
        // MessageProducer is used for sending messages (as opposed to MessageConsumer which is used for receiving them)
        producer = session.createProducer(destination); 

当为Queue创建会话时,此错误被清除。该更改在生产者和消费者文件

中完成。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
        javax.jms.Connection connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // Destination represents here our queue 'TESTQUEUE' 
        Destination destination = session.**createQueue("TESTQUEUE")**;
        // MessageProducer is used for sending messages (as opposed to MessageConsumer which is used for receiving them)
        producer = session.createProducer(destination); 

最新更新