如何从websocket端点访问响应式Panache实体会话管理器?(Quarkus,Mutiny,Panache,Re



我正在尝试构建一个示例应用程序,在该应用程序中,消息可以保存到数据库中,然后当websocket与会话连接时,这些消息保存在会话下,它们就会全部发送出去。

这是我的代码:

Message.java
---
package org.acme.hibernate.orm.panache;
import java.util.Date;
import javax.persistence.Cacheable;
import javax.persistence.Entity;
import io.quarkus.panache.common.Sort;
import io.quarkus.hibernate.reactive.panache.PanacheEntity;
import io.smallrye.mutiny.Multi;
@Entity
@Cacheable
public class Message extends PanacheEntity {
public String sessionId;
public String content;
public Date timestamp;
public Message() {
this.timestamp = new Date();
}
public Message(String content, String sessionId) {
this.content = content;
this.timestamp = new Date();
this.sessionId = sessionId;
}  
public static Multi<Message> getBySessionId(String sessionId) {
return stream("sessionId", Sort.by("timestamp"), sessionId);
}
}
MessageResource.java
---
package org.acme.hibernate.orm.panache;
import java.util.Date;
import java.util.List;
import static javax.ws.rs.core.Response.Status.CREATED;
import javax.inject.Inject;
import javax.transaction.Transactional;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.quarkus.hibernate.reactive.panache.Panache;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;

@Path("/message")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
public class MessageResource {

@GET
public Uni<List<Message>> list() {
return Message.listAll();
}
@GET
@Path("/{sessionId}")
public Multi<Message> listBySessionId(@PathParam("sessionId") String sessionId) {
return Message.getBySessionId(sessionId);
}
@POST
@Transactional
public Uni<Response> create(Message message) {
message.timestamp = new Date();
message.persist();
return Panache.withTransaction(message::persist)
.replaceWith(Response.ok(message).status(CREATED)::build);
}

/**
* Create a HTTP response from an exception.
*
* Response Example:
*
* <pre>
* HTTP/1.1 422 Unprocessable Entity
* Content-Length: 111
* Content-Type: application/json
*
* {
*     "code": 422,
*     "error": "Fruit name was not set on request.",
*     "exceptionType": "javax.ws.rs.WebApplicationException"
* }
* </pre>
*/
@Provider
public static class ErrorMapper implements ExceptionMapper<Exception> {
@Inject
ObjectMapper objectMapper;
@Override
public Response toResponse(Exception exception) {
Throwable throwable = exception;
int code = 500;
if (throwable instanceof WebApplicationException) {
code = ((WebApplicationException) exception).getResponse().getStatus();
}
// This is a Mutiny exception and it happens, for example, when we try to insert a new
// fruit but the name is already in the database
if (throwable instanceof CompositeException) {
throwable = ((CompositeException) throwable).getCause();
}
ObjectNode exceptionJson = objectMapper.createObjectNode();
exceptionJson.put("exceptionType", throwable.getClass().getName());
exceptionJson.put("code", code);
if (exception.getMessage() != null) {
exceptionJson.put("error", throwable.getMessage());
}
return Response.status(code)
.entity(exceptionJson)
.build();
}
}
}
ReplaySocket.java
---
package org.acme.websockets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.quarkus.hibernate.reactive.panache.PanacheEntityBase;
import io.quarkus.panache.common.Sort;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.acme.hibernate.orm.panache.Message;
import org.acme.hibernate.orm.panache.MessageResource;
import org.jboss.logging.Logger;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@ServerEndpoint("/replay/{sessionId}")         
@ApplicationScoped
public class ReplaySocket {
@Inject
MessageResource messageResource;
private static final Logger LOG = Logger.getLogger(ReplaySocket.class.getName());

Map<String, Session> sessions = new ConcurrentHashMap<>(); 
@OnOpen
public void onOpen(Session session, @PathParam("sessionId") String sessionId) {
sessions.put(sessionId, session);
broadcast("Starting");
replayMessages(sessionId);
}
private void replayMessages(String sessionId) {
Multi<Message> messages = Message.stream("sessionId", Sort.by("timestamp"), sessionId);

messages.subscribe().with(
message -> broadcast(message.content),
failure -> System.out.println(failure)
);
}
private void broadcast(String message) {
sessions.values().forEach(s -> {
s.getAsyncRemote().sendObject(message, result ->  {
if (result.getException() != null) {
System.out.println("Unable to send message: " + result.getException());
}
});
});
}
}

当我运行这个时,我可以从MessageResource端点保存和获取。然而,当我试图在websocket中获取消息时,我会收到以下错误:

2021-07-13 10:23:57,016 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-8) failed to execute statement [select message0_.id as id1_0_, message0_.content as content2_0_, message0_.sessionId as sessioni3_0_, message0_.timestamp as timestam4_0_ from Message message0_ where message0_.sessionId=$1 order by message0_.timestamp]
2021-07-13 10:23:57,017 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-8) could not execute query: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Session/EntityManager is closed
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1081)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:360)
at io.vertx.core.Future$$Lambda$889/0x0000000000000000.handle(Unknown Source)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:62)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:179)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:102)
at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:35)
at io.vertx.core.Promise.complete(Promise.java:66)
at io.vertx.core.Promise.handle(Promise.java:51)
at io.vertx.core.Promise.handle(Promise.java:29)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:124)
at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
at io.vertx.core.impl.future.FutureBase$$Lambda$625/0x0000000000000000.run(Unknown Source)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:836)
Caused by: java.lang.IllegalStateException: Session/EntityManager is closed
at org.hibernate.internal.AbstractSharedSessionContract.checkOpen(AbstractSharedSessionContract.java:393)
at org.hibernate.engine.spi.SharedSessionContractImplementor.checkOpen(SharedSessionContractImplementor.java:148)
at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1561)
at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:399)
at org.hibernate.internal.SessionImpl.getEntityUsingInterceptor(SessionImpl.java:592)
at org.hibernate.loader.Loader.getRow(Loader.java:1609)
at org.hibernate.loader.Loader.getRowFromResultSet(Loader.java:747)
at org.hibernate.loader.Loader.getRowsFromResultSet(Loader.java:1046)
at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader.getRowsFromResultSet(ReactiveQueryLoader.java:223)
at org.hibernate.reactive.loader.ReactiveLoaderBasedResultSetProcessor.reactiveExtractResults(ReactiveLoaderBasedResultSetProcessor.java:72)
at org.hibernate.reactive.loader.hql.impl.ReactiveQueryLoader$1.reactiveExtractResults(ReactiveQueryLoader.java:72)
at org.hibernate.reactive.loader.ReactiveLoader.reactiveProcessResultSet(ReactiveLoader.java:146)
at org.hibernate.reactive.loader.ReactiveLoader.lambda$doReactiveQueryAndInitializeNonLazyCollections$0(ReactiveLoader.java:77)
at org.hibernate.reactive.loader.ReactiveLoader$$Lambda$1220/0x0000000000000000.apply(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
... 23 more
java.lang.IllegalStateException: Session/EntityManager is closed

我哪里错了?我试图做的事情根本无效吗?如果是的话,你能推荐一些不同的吗?

谢谢!

我找到了一种访问响应式panache实体的方法,但由于某种原因,它只工作了3次,然后其他调用由于超时而失败。

@ServerEndpoint("/websocket/{name}")
@ApplicationScoped
class StartWebSocket(
val sessionFactory: SessionFactory
) {
@OnOpen
fun onOpen(session: Session, @PathParam("name") name: String) {
sessionFactory.withSession { SimpleUser.findByName(name) }.subscribe().with{ println("Found: ${it?.username}") }
}
}

如您所见,来自DB的响应由一个名为vert.x-eventloop-thread-8的线程处理。这是意料之中的:来自反应式sql驱动程序的所有响应都在eventloop线程上传递。

您看到的IllegalStateException很可能是由于会话没有在事件循环线程上打开这一事实造成的,这会导致一些问题,例如当所有这些代码都设计为在单个事件循环中运行时,您会让它多线程运行(并且不需要多个线程,更不用说让它多螺纹会降低性能(。您应该能够通过在调用Panache Reactive之前从代码中记录线程名称来验证这一点:我打赌它不是vert.x-eventloop-thread-X

换句话说,Panache Reactive被设计为在完全反应的上下文中使用。

由于Quarkus 2.1,Panache Reactive应该能够检测到这样的模式,并将操作卸载到正确的线程上。

最新更新