如何在Spring reactor/web flux中基于一个公共属性连接两个发布者,并从中构建一个发布者



假设我有两个通量Flux<Class1>Flux<Class2>,Class1和Class2都有一个公共属性,比如"id"。

用例是基于公共属性"id"连接两个flux,并构造一个Flux<Tuple<Class1, Class2>>,类似于连接两个sql表。

-对于属性id,两个通量之间总是存在1比1的匹配。

-通量不会包含超过100个对象。

-通量不按id排序。

如何在Project Reactor/Spring web流量中实现这一点?

假设:

  • 这两个集合都不是很大(您可以将它们保存在内存中,而不会冒OOM问题的风险(
  • 它们不是按id排序的
  • 集合中的每个元素在另一个元素中都有对应的元素

首先,您应该让这些Class1Class2实现Comparable,或者至少准备一个比较器实现,您可以使用它来根据它们的id对它们进行排序。

然后您可以使用zip运算符:

Flux<Class1> flux1 = ...
Flux<Class2> flux2 = ...
Flux<Tuple2<Class1,Class2>> zipped = Flux.zip(flux1.sort(comparator1), flux2.sort(comparator2));

Tuple2是一个反应堆核心类,它允许您访问像这样的Tuple的每个元素

Tuple2<Class1,Class2> tuple = ...
Class1 klass1 = tuple.getT1();
Class2 klass2 = tuple.getT2();

在这种情况下,sort将缓冲所有元素,如果集合很大,这可能会导致内存/延迟问题。根据这些集合中的排序方式(假设排序不保证,但这些集合是批量插入的(,您也可以缓冲其中一些集合(使用window(,并对每个窗口进行排序(使用CCD11(。

当然,理想情况下,能够提取已经排序的两个数据将避免缓冲数据,并提高应用程序中的背压支持。

我认为这应该在以下约束下工作:

  • 第二个Flux需要向所有订阅者发出相同的元素,因为它被一次又一次地订阅
  • 这基本上相当于嵌套循环连接,因此对于大流量来说效率非常低
  • 第一CCD_ 13的每个元素在第二CCD_。

    flux1.flatMap(
    f1 -> flux2.filter(f2 -> f2.id.equals(f1.id)).take(1)) // take the first with matching id
    .map(f2 -> Tuple.of(f1,f2))) // convert to tuple.
    

在没有IDE的情况下写入。考虑伪代码

最新更新