如何在Quarkus中通过REST从数据库流式传输大数据



我正在Quarkus中实现一个GET方法,该方法应向客户端发送大量数据。数据使用JPA/Hibernate从数据库中读取,序列化为JSON,然后发送到客户端。如何在不将全部数据存储在内存中的情况下高效地完成这项工作?我尝试了以下三种可能性,但都没有成功:

  1. 使用JPA中的getResultList并返回一个以列表为主体的ResponseMessageBodyWriter将负责将列表序列化为JSON。然而,这将把所有数据拉入内存,这对于大量记录来说是不可行的
  2. 使用JPA中的getResultStream,并返回一个以流为主体的ResponseMessageBodyWriter将负责将流序列化为JSON。不幸的是,这不起作用,因为在JAX-RS方法执行之后和调用MessageBodyWriter之前,EntityManager似乎已经关闭。这意味着底层ResultSet也已关闭,写入程序无法再从流中读取
  3. 使用StreamingOutput作为Response主体。与2中的问题相同。发生

所以我的问题是:用Quarkus通过JPA发送读取的大数据有什么诀窍?

您的结果必须是一个响应中的所有结果吗?让客户端请求下一个结果页面,直到没有下一个——一个典型的RESTneneneba API分页练习——怎么样?此外,JPA后端只会从数据库中获取该页面,因此不会有所有内容都存储在内存中的时刻。

根据您的需求,您有两个选项:

选项1:采用HAEOAS方法(https://restfulapi.net/hateoas/)。在REST标准上交换大型数据集的标准模式之一。因此,在这种方法中,服务器将在第一个响应中快速地使用一组HATEOAS URI进行响应。其中,每个HAEOAS URI表示一组元素。因此,您需要根据数据大小生成这些URI,并让客户端代码负责将这些URI作为RESTAPI单独调用,以获得实际数据。但在这个选项中,您也可以考虑Reactive风格,以获得更大的优势,利用小内存进行流式处理。

选项2:正如上面@Serkan所建议的,将结果集作为REST响应从数据库持续流式传输到客户端。在这里,您需要确保客户端和服务之间的网关设置超时。如果没有网关,你就是好的。因此,您可以利用所有层的反应式编程来实现连续流"DAO/数据访问层"-->quot;服务层"-->REST控制器-->客户Spring reactor也符合JAX-RS。https://quarkus.io/guides/getting-started-reactive.这是处理大型数据处理时的最佳架构风格。

这里有一些资源可以帮助您:

  • 使用反应式Hibernate:https://quarkusio.zulipchat.com/#narrow/stream/187030-users/topic/Large.20datasets.20using.20reactive.20SQL.20client
  • 分页与仅转发结果集:https://knes1.github.io/blog/2015/2015-10-19-streaming-mysql-results-using-java8-streams-and-spring-data.html

上一篇文章是针对SpringBoot的,但这个想法也可以用Quarkus实现。

------------编辑:

好的,我已经制定了一个例子,我做了一个批量选择。我用Panache做了,但没有它你也可以很容易地做。

我将返回一个ScrollableResult,然后在Rest资源中使用它,通过SSE(服务器发送的事件(将其流式传输到客户端。

------------编辑2:

我已将setFetchSize添加到查询中。你应该玩这个数字,并把它设置在1-50之间。如果值=1,那么db行将以1乘1的方式获取,这最像流式传输。并且它将使用最少的内存,但db&应用程序将更频繁。

在执行这样的批量操作时,强烈建议使用StatelessSession。

@Entity
public class Fruit extends PanacheEntity {
public String name;

// I've removed the logic from here to the Rest resource, 
// otherwise you cannot close the session

}
@Path("/fruits")
public class FruitResource {
@GET
@Produces(SERVER_SENT_EVENTS)
public void fruitsStream(@Context Sse sse, @Context SseEventSink sink) {
var sf = Fruit.getEntityManager().getEntityManagerFactory().unwrap(SessionFactory.class);
try (var session = sf.openStatelessSession();
var scrollableResults = session.createQuery("select f from Fruit f")
.scroll(ScrollMode.FORWARD_ONLY) 
.setFetchSize(1) {
while (scrollableResults.next()) {
sink.send(sse.newEventBuilder().data(scrollableResults.get(0)).mediaType(APPLICATION_JSON_TYPE).build());
}
sink.close();
}
}
}

然后我这样调用这个Rest端点(通过httpie(:

> http :8080/fruits --stream
data: {"id":9996,"name":"applecfcdd592-1934-4f0e-a6a8-2f88fae5d14c"}
data: {"id":9997,"name":"apple7f5045a8-03bd-4bf5-9809-03b22069d9f3"}
data: {"id":9998,"name":"apple0982b65a-bc74-408f-a6e7-a165ec3250a1"}
data: {"id":9999,"name":"apple2f347c25-d0a1-46b7-bcb6-1f1fd5098402"}
data: {"id":10000,"name":"apple65d456b8-fb04-41da-bf07-73c962930629"}

希望这对你有帮助。

最新更新