我使用Jersey实现了一个Restful web接口,用于通过HTTP将从内部JMS发布者接收的消息发送到外部客户端。我已经设法将测试消息发送到Java客户端,但线程在完成write()执行之前抛出了一个空指针异常,关闭了连接并阻止了进一步的通信。
这是我的资源类:
@GET
@Path("/stream_data")
@Produces(SseFeature.SERVER_SENT_EVENTS)
public EventOutput getServerSentEvents(@Context ServletContext context){
final EventOutput eventOutput = new EventOutput();
new Thread( new ObserverThread(eventOutput, (MService) context.getAttribute("instance")) ).start();
return eventOutput;
}
这是我线程的运行方法:
public class ObserverThread implements Observer, Runnable {
//constructor sets eventOutput & mService objects
//mService notifyObservers() called when JMS message received
//text added to Thread's message queue to await sending to client
public void run() {
try {
String message = "{'symbol':'test','entryType'='0','price'='test'}";
Thread.sleep(1000);
OutboundEvent.Builder builder = new OutboundEvent.Builder();
builder.mediaType(MediaType.APPLICATION_JSON_TYPE);
builder.data(String.class, message);
OutboundEvent event = builder.build();
eventOutput.write(event);
System.out.println(">>>>>>SSE CLIENT HAS BEEN REGISTERED!");
mService.addObserver(this);
while(!eventOutput.isClosed()){
if(!updatesQ.isEmpty()){
pushUpdate(updatesQ.dequeue());
}
}
System.out.println("<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!");
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
这是我的客户代码:
Client client = ClientBuilder.newBuilder().register(SseFeature.class).build();
WebTarget target = client.target(url);
EventInput eventInput = target.request().get(EventInput.class);
try {
while (!eventInput.isClosed()) {
eventInput.setChunkType(MediaType.WILDCARD_TYPE);
final InboundEvent inboundEvent = eventInput.read();
if (inboundEvent != null) {
String theString = inboundEvent.readData();
System.out.println(theString + "n");
}
}
} catch (Exception e) {
e.printStackTrace();
}
我正在将"{'symbol':'test','entryType'='0','price'='test'}"测试消息打印到客户端控制台,但服务器随后打印NullPointerException,然后才能打印">>>SSE客户端注册"消息。这将关闭连接,因此客户端退出while循环并停止侦听更新。
我将该项目转换为webapp 3.0版本的facet,以便向web.xml添加一个异步支持的标记,但我收到了相同的空指针错误。我倾向于认为这是由servlet在返回第一条消息后结束Request/Response对象引起的,证据显示在堆栈跟踪中:
Exception in thread "Thread-20" java.lang.NullPointerException
at org.apache.coyote.http11.InternalOutputBuffer.realWriteBytes(InternalOutputBuffer.java:741)
at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(ByteChunk.java:434)
at org.apache.coyote.http11.InternalOutputBuffer.flush(InternalOutputBuffer.java:299)
at org.apache.coyote.http11.Http11Processor.action(Http11Processor.java:981)
at org.apache.coyote.Response.action(Response.java:183)
at org.apache.catalina.connector.OutputBuffer.doFlush(OutputBuffer.java:314)
at org.apache.catalina.connector.OutputBuffer.flush(OutputBuffer.java:288)
at org.apache.catalina.connector.CoyoteOutputStream.flush(CoyoteOutputStream.java:98)
at org.glassfish.jersey.message.internal.CommittingOutputStream.flush(CommittingOutputStream.java:292)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:241)
at org.glassfish.jersey.server.ChunkedOutput$1.call(ChunkedOutput.java:192)
at org.glassfish.jersey.internal.Errors.process(Errors.java:315)
at org.glassfish.jersey.internal.Errors.process(Errors.java:242)
at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:345)
at org.glassfish.jersey.server.ChunkedOutput.flushQueue(ChunkedOutput.java:192)
at org.glassfish.jersey.server.ChunkedOutput.write(ChunkedOutput.java:182)
at com.bpc.services.service.ObserverThread.run(MarketObserverThread.java:32)
at java.lang.Thread.run(Thread.java:745)
<<<<<<<SSE CLIENT HAS BEEN DEREGISTERED!
我也试过测试一个sse广播。在这种情况下,我没有看到任何抛出的异常,但一旦收到第一条消息,连接就会关闭,这让我相信是servlet中的某种东西迫使连接关闭。有人能建议我如何在服务器端调试这个吗?
在Jersey为ExecutorService
实例注入@Context
时,我遇到了一个类似的问题,这个问题似乎是一个长期存在的错误。在Sse
(版本2.27)的当前实现中,
class JerseySse implements Sse {
@Context
private ExecutorService executorService;
@Override
public OutboundSseEvent.Builder newEventBuilder() {
return new OutboundEvent.Builder();
}
@Override
public SseBroadcaster newBroadcaster() {
return new JerseySseBroadcaster(executorService);
}
}
executorService
字段从未初始化,因此在我的情况下,JerseySseBroadcaster
会引发NullPointerException
。我通过明确地触发注入来解决这个错误。
如果您将HK2用于CDI(Jersey的默认设置),则上面问题的解决方案的大致草图可能类似于以下内容:
@Singleton
@Path("...")
public class JmsPublisher {
private Sse sse;
private SseBroadcaster broadcaster;
private final ExecutorService executor;
private final BlockingQueue<String> jmsMessageQueue;
...
@Context
public void setSse(Sse sse, ServiceLocator locator) {
locator.inject(sse); // Inject sse.executorService
this.sse = sse;
this.broadcaster = sse.newBroadcaster();
}
...
@GET
@Path("/stream_data")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void register(SseEventSink eventSink) {
broadcaster.register(eventSink);
}
...
@PostConstruct
private void postConstruct() {
executor.submit(() -> {
try {
while(true) {
String message = jmsMessageQueue.take();
broadcaster.broadcast(sse.newEventBuilder()
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(String.class, message)
.build());
}
} catch(InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
@PreDestroy
private void preDestroy() {
executor.shutdownNow();
}
}