我尝试在rx风格中使用有限的消费者计数(例如2)执行长时间操作。
问题是如何确保只有两个消费者在同一时间执行其工作。
让我们有一个消费者接口:
public interface Consumer{
//Take a lot of time
Observable<Result> doJob(Task task);
}
和队列类:
public class Queue {
public void enqueue(Task task){
//TODO: enqueue task and do it with limited count of Consumers
}
}
如何组织任务队列和消费者的工作?
使用调度程序。将并发工作限制为两个任务:
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(2));
events.flatMap(x ->
Observable.fromCallable(() -> process(x))
.subscribeOn(scheduler))
.subscribe(subscriber);