项目反应器 - 合并两个有状态的发布者并发出结果



我想用反应堆设计一个处理管道,它的作用如下。

我有两个输入发布服务器orderEntries(冷(和hotBroadcasts(热(。我想将hotBroadcasts发出的项目聚合到(可变的(内存数据结构中 - 比如一个HashMap - 对于orderEntries中的每个项目,我想从该 Map 中选择一个相应的元素,创建结果项目并推送到下游订阅者。

来自hotBroadcasts的事件以任意顺序出现,这就是为什么我想将它们存储在内存中以便于检索。

从概念上讲,它应该像这样工作:

       orderEntries                      hotBroadcasts
           |                                   | 
           |                                   | 
           |                                   | 
                                              / 
            ----------------> <----------------
                   (aggregate events from hotBroadcasts)     
                             |
                             |
                        resulting item
                             |
                             |
                            /
                      downstream subcriber  

到目前为止,我设法用 ReplayProcessor 勾勒出一个解决方案,由 Kotlin 伪示例说明:

val orderEntries = Flux.interval(Duration.of(1, ChronoUnit.SECONDS))
val hotBroadcasts = ReplayProcessor.create<String>(1000, false)
orderEntries.concatMap { entryId ->
    // problematic filter - skims through all that ReplayProcessor has cached
    hotBroadcasts.filter { broadcastId ->
        "Broadcast:$entryId" == broadcastId
    }
    .take(1)
    .map { "EntryId: $entryId, BroadcastId: $it" }
}.subscribe { LOG.info(it) }
Flux.interval(Duration.of(200, ChronoUnit.MILLIS))
        .concatMap { Flux.just(it, it - 100000) }
        .map { "Broadcast:$it" }
        .subscribe {
            hotBroadcasts.onNext(it)
        }

这里的问题是过滤hotBroadcast会浏览orderEntries中每个项目的所有项目。因此,我的想法是将它们存储在HashMap中。

谁能指出我正确的方向?

可以聚合来自两个不同发布者的消息的对象是具有 2 个参数的异步过程调用。这样的调用可以在 rxjava 中使用 io.reactivex.Single.zip(SingleSource arg1, SingleSource arg2, BiFunction func) 构造,也可以在纯 Java 中使用 java.util.concurrent.CompletableFuture.thenCombine(CompletionStage arg2, BiFunction func) 构造。

你需要一个特殊的HashMap来保存异步过程调用。当第一次使用给定标签访问此 HashMap 时,应自动创建调用。

所以一个公共者调用

asyncProc=callMap.get(label); // asyncProc is created and stored with the label as a key
asyncProc.arg1.complete(value);

和其他公共者调用

asyncProc=callMap.get(label); // previously created instance returned
asyncProc.arg2.complete(value);

在两个发布者都提供了其参数后,将执行异步过程。

最新更新