Jersey SSE-eventOutput.write在发送第一条消息后抛出null指针



我使用Jersey实现了一个Restful web接口,用于通过HTTP将从内部JMS发布者接收的消息发送到外部客户端。我已经设法将测试消息发送到Java客户端,但线程在完成write()执行之前抛出了一个空指针异常,关闭了连接并阻止了进一步的通信。

这是我的资源类:

@GET
@Path("/stream_data")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents(@Context ServletContext context){
    final EventOutput eventOutput = new EventOutput();
    new Thread( new ObserverThread(eventOutput, (MService) context.getAttribute("instance")) ).start();
    return eventOutput;
}

这是我线程的运行方法:

public class ObserverThread implements Observer, Runnable {
   //constructor sets eventOutput & mService objects
   //mService notifyObservers() called when JMS message received
   //text added to Thread's message queue to await sending to client 
   public void run() {
    try {
        String message = "{'symbol':'test','entryType'='0','price'='test'}";
        Thread.sleep(1000);
        OutboundEvent.Builder builder = new OutboundEvent.Builder();
        builder.mediaType(MediaType.APPLICATION_JSON_TYPE);
        builder.data(String.class, message);
        OutboundEvent event = builder.build();
        eventOutput.write(event);
        System.out.println(">>>>>>SSE CLIENT HAS BEEN REGISTERED!");
        mService.addObserver(this);
        while(!eventOutput.isClosed()){
            if(!updatesQ.isEmpty()){
                pushUpdate(updatesQ.dequeue());
            }
        }
        System.out.println("<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!");
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

这是我的客户代码:

Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
    WebTarget target = client.target(url);
    EventInput eventInput = target.request().get(EventInput.class);
    try {
        while (!eventInput.isClosed()) {
            eventInput.setChunkType(MediaType.WILDCARD_TYPE);
            final InboundEvent inboundEvent = eventInput.read();
            if (inboundEvent != null) {
                String theString = inboundEvent.readData();
                System.out.println(theString + "n");
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }

我正在将"{'symbol':'test','entryType'='0','price'='test'}"测试消息打印到客户端控制台,但服务器随后打印NullPointerException,然后才能打印">>>SSE客户端注册"消息。这将关闭连接,因此客户端退出while循环并停止侦听更新。

我将该项目转换为webapp 3.0版本的facet,以便向web.xml添加一个异步支持的标记,但我收到了相同的空指针错误。我倾向于认为这是由servlet在返回第一条消息后结束Request/Response对象引起的,证据显示在堆栈跟踪中:

Exception in thread "Thread-20" java.lang.NullPointerException
at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:741)
at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434)
at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299)
at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:981)
at org.apache.coyote.Response.action(Response.java:183)
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314)
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:98)
at org.glassfish.jersey.message.internal.CommittingOutputStream.flush(CommittingOutputStream.java:292)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:241)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:192)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:242)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:345)
at org.glassfish.jersey.server.ChunkedOutput.flushQueue(ChunkedOutput.java:192)
at org.glassfish.jersey.server.ChunkedOutput.write(ChunkedOutput.java:182)
at com.bpc.services.service.ObserverThread.run(MarketObserverThread.java:32)
at java.lang.Thread.run(Thread.java:745)
<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!

我也试过测试一个sse广播。在这种情况下,我没有看到任何抛出的异常,但一旦收到第一条消息,连接就会关闭,这让我相信是servlet中的某种东西迫使连接关闭。有人能建议我如何在服务器端调试这个吗?

在Jersey为ExecutorService实例注入@Context时,我遇到了一个类似的问题,这个问题似乎是一个长期存在的错误。在Sse(版本2.27)的当前实现中,

class JerseySse implements Sse {
    @Context
    private ExecutorService executorService;
    @Override
    public OutboundSseEvent.Builder newEventBuilder() {
        return new OutboundEvent.Builder();
    }
    @Override
    public SseBroadcaster newBroadcaster() {
        return new JerseySseBroadcaster(executorService);
    }
}

executorService字段从未初始化,因此在我的情况下,JerseySseBroadcaster会引发NullPointerException。我通过明确地触发注入来解决这个错误。

如果您将HK2用于CDI(Jersey的默认设置),则上面问题的解决方案的大致草图可能类似于以下内容:

@Singleton
@Path("...")
public class JmsPublisher {
    private Sse sse;
    private SseBroadcaster broadcaster;
    private final ExecutorService executor;
    private final BlockingQueue<String> jmsMessageQueue;
    ...
    @Context
    public void setSse(Sse sse, ServiceLocator locator) {
        locator.inject(sse); // Inject sse.executorService
        this.sse = sse;
        this.broadcaster = sse.newBroadcaster();
    }
    ...
    @GET
    @Path("/stream_data")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void register(SseEventSink eventSink) {
        broadcaster.register(eventSink);
    }
    ...
    @PostConstruct
    private void postConstruct() {
        executor.submit(() -> {
            try {
                while(true) {
                    String message = jmsMessageQueue.take();
                    broadcaster.broadcast(sse.newEventBuilder()
                        .mediaType(MediaType.APPLICATION_JSON_TYPE)
                        .data(String.class, message)
                        .build());
                }
            } catch(InterruptedException e) {
                Thread.currentThread().interrupt();
            }
       });
    }
    @PreDestroy
    private void preDestroy() {
        executor.shutdownNow();
    }
}

最新更新