假设我有两个通量Flux<Class1>
和Flux<Class2>
,Class1和Class2都有一个公共属性,比如"id"。
用例是基于公共属性"id"连接两个flux,并构造一个Flux<Tuple<Class1, Class2>>
,类似于连接两个sql表。
-对于属性id,两个通量之间总是存在1比1的匹配。
-通量不会包含超过100个对象。
-通量不按id排序。
如何在Project Reactor/Spring web流量中实现这一点?
假设:
- 这两个集合都不是很大(您可以将它们保存在内存中,而不会冒OOM问题的风险(
- 它们不是按id排序的
- 集合中的每个元素在另一个元素中都有对应的元素
首先,您应该让这些Class1
、Class2
实现Comparable
,或者至少准备一个比较器实现,您可以使用它来根据它们的id对它们进行排序。
然后您可以使用zip
运算符:
Flux<Class1> flux1 = ...
Flux<Class2> flux2 = ...
Flux<Tuple2<Class1,Class2>> zipped = Flux.zip(flux1.sort(comparator1), flux2.sort(comparator2));
Tuple2
是一个反应堆核心类,它允许您访问像这样的Tuple的每个元素
Tuple2<Class1,Class2> tuple = ...
Class1 klass1 = tuple.getT1();
Class2 klass2 = tuple.getT2();
在这种情况下,sort
将缓冲所有元素,如果集合很大,这可能会导致内存/延迟问题。根据这些集合中的排序方式(假设排序不保证,但这些集合是批量插入的(,您也可以缓冲其中一些集合(使用window
(,并对每个窗口进行排序(使用CCD11(。
当然,理想情况下,能够提取已经排序的两个数据将避免缓冲数据,并提高应用程序中的背压支持。
我认为这应该在以下约束下工作:
- 第二个
Flux
需要向所有订阅者发出相同的元素,因为它被一次又一次地订阅 - 这基本上相当于嵌套循环连接,因此对于大流量来说效率非常低
-
第一CCD_ 13的每个元素在第二CCD_。
flux1.flatMap( f1 -> flux2.filter(f2 -> f2.id.equals(f1.id)).take(1)) // take the first with matching id .map(f2 -> Tuple.of(f1,f2))) // convert to tuple.
在没有IDE的情况下写入。考虑伪代码