RxJava的生产者-消费者



我尝试在rx风格中使用有限的消费者计数(例如2)执行长时间操作。

问题是如何确保只有两个消费者在同一时间执行其工作。

让我们有一个消费者接口:

public interface Consumer{
    //Take a lot of time
    Observable<Result> doJob(Task task);
}

和队列类:

public class Queue {
    public void enqueue(Task task){
        //TODO: enqueue task and do it with limited count of Consumers
    }
}

如何组织任务队列和消费者的工作?

使用调度程序。将并发工作限制为两个任务:

Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
events.flatMap(x ->
     Observable.fromCallable(() -> process(x))
               .subscribeOn(scheduler))
    .subscribe(subscriber);

相关内容

  • 没有找到相关文章

最新更新