使用rxJava进行可观察的分叉



我想使用反应式编程来处理数据记录。

我需要先根据一些规则过滤记录。

  • 只有订阅服务器才会使用错误记录
  • 良好的记录将被更多的处理器处理
| -> RejectedRecordsSubscriber
|
RecordPublisher ---> RecordMatcherProcessor ---> RecordProcessor ---> ...

我对使用2个filters((运算符进行拆分感到不舒服,因为筛选过程相当昂贵。

我宁愿过滤一次并将记录发布给正确的订阅者。

rxJava是如何实现的?使用groupBy运算符是唯一的方法吗?

注意:在我使用JavaFlowneneneba API编写的POC中,订阅服务器的对象类型是已知的,因此我可以发布到一个或另一个订阅服务器。

您可以使用Subject来处理它。

示例代码:

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.subjects.PublishSubject;
import java.util.concurrent.TimeUnit;
public class Test2 {
public static void main(String[] args) {
Observable<Long> recordPublisher = Observable.interval(1, TimeUnit.SECONDS);
PublishSubject<Long> goodRecordSubject = PublishSubject.create();
PublishSubject<Long> badRecordSubject = PublishSubject.create();
//Record Match Process
recordPublisher.subscribe(i -> {
if (i % 2 == 0) {
goodRecordSubject.onNext(i);
} else {
badRecordSubject.onNext(i);
}
});
//Now deal with the good record;
goodRecordSubject.subscribe(
i -> System.out.println("get good record: " + i)
);
//Deal with the bad record;
badRecordSubject.subscribe(
i -> System.out.println("get bad record: " + i)
);
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

最新更新