在不使用"parallel"运算符的情况下使顺序处理异步



我现在正在研究的Java方法:

Observable<Map.Entry<String, ConstituentInfo>> obs = Observable.from(constituents.entrySet());
Subscriber<Map.Entry<String, ConstituentInfo>> sub = new Subscriber<Map.Entry<String, ConstituentInfo>>(){
    String securityBySymbol, company, symbol, companyName, marketPlace, countryName, tier, tierId;
    ConstituentInfo constituent;
    Integer compId;
    @Override
    public void onNext(Map.Entry<String, ConstituentInfo> entry) {
         logger.info("blah blah test");
    }
    @Override
    public void onCompleted() {
         logger.info("completed successfully");
    }
    @Override
    public void onError(Throwable throwable) {
         logger.error(throwable.getMessage());
         throwable.printStackTrace();
    }
};
obs.observeOn(Schedulers.io()).subscribe(sub);

该方法本质上处理Map.Entry中的每个条目,但这似乎是按顺序处理的(同一线程)。如何在不使用"并行"运算符(即并发处理条目)的情况下使此过程异步?我尝试运行上面的代码,但缺少一些结果(有些结果未正确处理)。

您的可观察量在报告时在主线程上运行,并且 Subscriber 方法将由 Schedulers.io() 给出的一个工作线程调用,这就是它出现在一个线程上的原因。您的代码并不指示订阅者在日志记录之外正在执行任何实际工作,因此此处无需异步执行

任何操作。

也许你打算这样做?

obs.subscribeOn(Schedulers.io()).subscribe(sub);

在并行处理方面,如果您的最后一行是这样的:

obs.doOnNext(entry -> doWork(entry)).observeOn(Schedulers.io()).subscribe(sub);

然后,您可以像这样异步完成doWork位:

int numProcessor = Runtime.getRuntime().availableProcessor();obs.buffer(Math.max(1, constituents.size()/numProcessor))   .flatMap(list -> Observable.from(list)                   .doOnNext(entry -> doWork(entry)                   .subscribeOn(Schedulers.computation())   .observeOn(Schedulers.io())   .subscribe(sub);

您需要按处理器缓冲工作,否则可能会发生大量线程上下文切换。

我不确定你为什么错过结果。如果你有一个可重复的测试用例,那么将代码粘贴到这里,或者把它作为一个问题报告给 github 上的 RxJava。

相关内容

  • 没有找到相关文章

最新更新