Spring 反应式实现多次调用 db



我是一个新的反应式,并试图以有效的方式完成以下任务。我有一个表,其中包含每个用户的事件。我正在尝试获取使用最新类别过滤的给定用户的每个事件名称的最新行。

table structure
user id , category , event name, details , insert timestamp, event text( payload of event)

反应式方法 呼叫 1 到卡桑德拉

Mono<Event> latestCategory = repository.findByUserId(userId).sort().next(); // sort is by insert 
timestamp;

呼叫 2 至 Cassandra

Flux<Event> fluxEvents = repository.findByUserId(userId)
.groupBy(Event::name) //grouping by event name
.flatmap(grp -> {
grp.sort() // sorting for each event
grp.next().zipWith(latestCategory) // picking latest row for each event
.filter(eventWithLatestCategory -> 
eventWithLatestCategory.getT1().category.equals(eventWithLatestCategory.getT2().category) //filtering by each category
.map(Tuple2::getT1)// picking latest event row for latest category
};

从功能上讲,每件事都工作正常,但是我看到表中每一行都发生了数据库调用的问题。在命令式编程中,我可以通过一个数据库调用来实现它,然后应用上面的逻辑。如何在反应世界中做同样的事情?

enter code here
">

最轻触"的方法是只做:

Mono<Event> latestCategory = repository.findByUserId(userId).sort().next().cache();

。这意味着latestCategory只提取一次,然后缓存用于所有其他订阅。

不过,这可能不是最佳解决方案。

在这种情况下,就目前而言,您是在Flux本身中进行排序。这通常不是明智的,但如果你绝对需要这样做,那么你不妨这样做:

repository.findByUserId(userId).sort().collectList().map(eventList -> {
//Deal imperatively with a List<Event>
});

。然后,可以从单个数据库调用访问列表,您可以根据需要随机查询该列表。您实际上是在获得,而不是以这种方式失去性能,因为只有一个数据库调用 - 并且由于排序Flux在整个发布者完成之前永远无法输出任何内容,因此这和简单地收集到列表之间没有实质性区别。

然而,更好的方法是让 cassandra 进行底层排序,然后使用switchOnFirst()将第一个元素(您的最新类别(与发生的所有其他元素一起压缩:

repository.findByUserId(userId).switchOnFirst((signal, flux) ->
flux.map(val -> Tuples.of(signal.get(), val)) //(In real-world use, check the `flux` actually has a value first)
)
//...etc

这意味着:

  • 您只需要一个数据库查询;
  • 您可以被动地处理出现的每个值,而不必等待整个流结束;
  • 无需缓存发布者,如有必要,最好避免缓存。

最新更新