在客户端使用 CometD 获取传输和 EOF 异常



我在使用 CometD 时在客户端收到传输异常和 EOF 异常。我试图通过CometD文档找到解决方案,我也用谷歌搜索但找不到任何解决方案。

以下是我得到的异常的堆栈跟踪

CommetD Connected!!!!!!!
cometDURL http://192.168.210.106:8080/cometd2/cometd
subscriberChannel /service/java
In finally...........
2013-09-12 10:20:26,200 [HttpClient-201] INFO  org.cometd.client.BayeuxClient.180910783     - Messages failed [{id=31, connectionType=long-polling, channel=/meta/connect,    clientId=2qpitkhfcjh1g511al4xal9vltg}]
 org.cometd.common.TransportException at org.cometd.client.transport.LongPollingTransport$TransportExchange.onResponseComplete(LongPollingTransport.java:334) at org.eclipse.jetty.client.HttpExchange$Listener.onResponseComplete(HttpExchange.java:999)
            at org.eclipse.jetty.client.HttpExchange.setStatus(HttpExchange.java:288)
            at org.eclipse.jetty.client.HttpConnection$Handler.messageComplete(HttpConnection.java:628)
            at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:786)
            at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:220)
            at org.eclipse.jetty.client.HttpConnection.handle(HttpConnection.java:275)
            at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:545)
            at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:43)
            at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:529)
            at java.lang.Thread.run(Thread.java:662)
java.io.EOFException: closed: TransportExchange@50115a3f=POST//192.168.145.210:8080/cometd2/cometd#WAITING(64ms)sent=64ms
            at org.eclipse.jetty.client.HttpConnection.close(HttpConnection.java:663)
            at org.eclipse.jetty.client.HttpDestination.close(HttpDestination.java:639)
            at org.eclipse.jetty.client.HttpClient.doStop(HttpClient.java:491)
            at org.eclipse.jetty.util.component.AbstractLifeCycle.stop(AbstractLifeCycle.java:83)
            at com.commetd.CommetD.publish(CommetD.java:58)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:301)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
            at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:301)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
            at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
            at $Proxy25.callProxy(Unknown Source)
            at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
            at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
            at java.lang.reflect.Method.invoke(Method.java:597)
            at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:301)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:182)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:149)
            at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:106)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:171)
            at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
            at $Proxy171.publishEventToBulletinManager(Unknown Source)
            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
            at java.lang.Thread.run(Thread.java:662)

我使用以下代码连接到 CometD 服务器

public static boolean publish(String cometDURL,String subscriberChannel,Map<String,Object> msgToPublish) throws Exception{
    boolean isSuccess = true;
    HttpClient httpClient = new HttpClient();
    BayeuxClient client = null;
    ClientTransport transport=null;
    // Here setup Jetty's HttpClient, for example:
    httpClient.setMaxConnectionsPerAddress(10);
    try {
        httpClient.start();
        Map<String, Object> options = new HashMap<String, Object>();
        transport = LongPollingTransport.create(options, httpClient);
        client = new BayeuxClient(cometDURL, transport);
        client.handshake();

        boolean handshaken = client.waitFor(1000, BayeuxClient.State.CONNECTED);
        if(handshaken){
            System.out.println("CommetD Connected!!!!!!!");
            /*Map<String,Object> data = new HashMap<String,Object>();
            data.put("name", "Java msg......... Dynamic msg.....");*/
            // Fill in the data
            client.getChannel(subscriberChannel).publish(msgToPublish);
        }else{
            isSuccess=false;
            System.out.println("CommetD not connected:-(");
        }
    } catch (Exception e) {
        isSuccess=false;
        // TODO Auto-generated catch block
        e.printStackTrace();
    }finally{
        client.disconnect();
        client.waitFor(1000, BayeuxClient.State.DISCONNECTED);
        transport.terminate();
        httpClient.stop();
    }
    return isSuccess;
}

服务器端代码为

@Listener("/service/java")
public void processMsgFromJava(ServerSession remote, ServerMessage.Mutable message)
{
    System.out.println("Received msg from java..................");
    Map<String, Object> input = message.getDataAsMap();
    String eventId = (String)input.get("eventID");
    String updatedDate = (String)input.get("updatedDate");
   // String channelName =(String)input.get("ChannelToPublish");
    System.out.println("msg received : "+eventId+":"+updatedDate);
   // Map<String, Object> output = new HashMap<String, Object>();
    //output.put("greeting", name);
   // remote.deliver(serverSession, "/java/test", output, null);
   String channelName = "/java/test";
    // Initialize the channel, making it persistent and lazy
    bayeux.createIfAbsent(channelName, new ConfigurableServerChannel.Initializer()
    {
        public void configureChannel(ConfigurableServerChannel channel)
        {
            channel.setPersistent(true);
            channel.setLazy(true);
        }
    });
   // System.out.println("HHHHHHHHHHHHREEEEEEEEEEEEEEEEEEEEEEEEERRRRRRRRRRRRRRRRRRRRRRRRRRRRREEEEEEEEEE");
    // Convert the Update business object to a CometD-friendly format
   /* Map<String, Object> data = new HashMap<String, Object>();
    data.put("javamsg", name);*/

    // Publish to all subscribers
    ServerChannel channel = bayeux.getChannel(channelName);
    channel.publish(serverSession, input, null);
}

客户端的JavaScript:

(function($)
{
 var cometd = $.cometd;
$(document).ready(function()
{
    function _connectionEstablished()
    {
        $('#body').append('<div>CometD Connection Established</div>');
    }
    function _connectionBroken()
    {
        $('#body').append('<div>CometD Connection Broken</div>');
    }
    function _connectionClosed()
    {
        $('#body').append('<div>CometD Connection Closed</div>');
    }
    // Function that manages the connection status with the Bayeux server
    var _connected = false;
    function _metaConnect(message)
    {
        if (cometd.isDisconnected())
        {
            _connected = false;
            _connectionClosed();
            return;
        }
        var wasConnected = _connected;
        _connected = message.successful === true;
        if (!wasConnected && _connected)
        {
            _connectionEstablished();
        }
        else if (wasConnected && !_connected)
        {
            _connectionBroken();
        }
    }
    // Function invoked when first contacting the server and
    // when the server has lost the state of this client
    function _metaHandshake(handshake)
    {
        if (handshake.successful === true)
        {
            cometd.batch(function()
            {
                cometd.subscribe('/java/test', function(message)
                {
                    $('#body').append('<div>Server Says: ' + message.data.eventID + ':'+ message.data.updatedDate + '</div>');
                });
                // Publish on a service channel since the message is for the server only
               // cometd.publish('/service/hello', { name: 'World' });
            });
        }
    }
    // Disconnect when the page unloads
    $(window).unload(function()
    {
        cometd.disconnect(true);
    });
    var cometURL = location.protocol + "//" + location.host + config.contextPath + "/cometd";
    cometd.configure({
        url: cometURL,
        logLevel: 'debug'
    });
    cometd.addListener('/meta/handshake', _metaHandshake);
    cometd.addListener('/meta/connect', _metaConnect);
    cometd.handshake();
});
})(jQuery);

谁能指导我解决这个问题。我不确定我哪里出错了。

首先,你正在使用一种叫做com.commetd.CommetD的东西,它不是标准的CometD代码。不清楚您显示的代码是否是此类的代码。

假设它是原始 CometD 代码的简单包装器,堆栈跟踪解释了这一切:您正在从 publish() 方法中停止HttpClient,这毫无意义。

请仔细阅读 CometD 概念以及如何使用 CometD 客户端库。

最新更新