假设我们有一个这样的工人列表:
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));
我想找到第一个完成工作的工人,所以:
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
但是有一个问题,我不想等所有工人都完成工作再找到第一个,而是第一个工人一完成工作就找到!
public class Test {
public static void main(String[] args) {
List<Worker> workers = new ArrayList<>();
workers.add(new Worker(1));
workers.add(new Worker(2));
workers.add(new Worker(3));
workers.add(new Worker(4));
workers.add(new Worker(5));
Worker first = workers.parallelStream().filter(Worker::finish).findFirst().orElse(null);
if (first != null) {
System.out.println("id : " + first.id);
}
}
static class Worker {
int id;
Worker(int id) {
this.id = id;
}
boolean finish() {
int t = id * 1000;
System.out.println(id + " -> " + t);
try {
Thread.sleep(t);
} catch (InterruptedException ignored) {
}
return true;
}
}
}
有没有办法使用 java.util.Stream
来实现它?
谢谢。
你似乎对Stream
有一个严重的误解。 Stream
并不意味着启动工人。事实上,如果你使用findFirst
可能会发生它不启动工作线程,而是第一个工作线程。因此,它也不会等待"所有工作线程完成",而只会等待当前挂起的线程。但是,由于您的流相当小,因此可能所有工作线程都已启动,因为您的环境中有尽可能多的可用线程。但这不是一个有保证的行为。
请注意,如果您使用顺序流而不是并行流,它肯定会只处理第一项(因为它返回true
(,而不处理其他项目。但是,由于流实现无法预测该结果,它将尊重您通过并行执行"加速"操作的请求,并且可能会使用更多线程提前开始处理更多项目。
当您使用 finish
方法作为 Stream 的筛选器时,这意味着为了评估特定 Worker 的筛选器谓词,Worker 必须完成其工作。
但是,当您将此代码作为并行流运行时,筛选器可能会同时应用于多个工作线程,在这种情况下,第一个完成的筛选器将为您提供输出。但是,您无法控制并行流将使用的线程数。它可能会决定应该在同一线程上处理某些 Worker ,在这种情况下,根本不会处理其中一些 Worker (因为您的终端操作只需要一个 Worker 完成其处理(。
因此,如果您的目标是同时为所有工作线程执行finish
,则不能使用流(甚至不能使用并行流(。
这是一个老问题,但在这里找到了一些不错的解决方案:https://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/
在"调用任何">下:
批量提交可调用对象的另一种方法是 invokeAny(( 方法 它的工作方式与 invokeAll(( 略有不同。而不是返回 此方法阻止的未来对象,直到第一个可调用对象终止 并返回该可调用对象的结果。
将流更改为可调用对象的集合。看起来真的很干净。
您可以尝试使用 Reactive Extensions for Java ( RxJava
( 中的Observable
,而不是使用 Stream
。下面的示例代码。
public class Example {
public static void main(String[] args) {
Maybe<Worker> workerResult = Observable.fromArray(Worker.run(1), Worker.run(2), Worker.run(3), Worker.run(4), Worker.run(5))
.flatMap(worker -> (Observable<Worker>) worker)
.firstElement();
workerResult.subscribe(onNext -> System.out.println("First worker [" + onNext.toString() + "]"));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Worker {
private int id;
static Observable run(int id) { return Observable.just(new Worker(id)).observeOn(Schedulers.computation()).doOnNext(Worker::process); }
private Worker(int id) { this.id = id; }
public void process() {
try {
Thread.sleep(new Random().nextInt(2000));
} catch (InterruptedException e) {
System.out.println(String.format("[%s] Thread interrupted [%s]", Thread.currentThread(), id));
}
System.out.println(String.format("[%s] Worker [%s]", Thread.currentThread(), id));
}
public String toString() { return "Worker [" + id + "]"; }
}
示例输出:
[Thread[RxComputationThreadPool-2,5,main]] Worker [2]
[Thread[RxComputationThreadPool-1,5,main]] Thread interrupted [1]
[Thread[RxComputationThreadPool-1,5,main]] Worker [1]
[Thread[RxComputationThreadPool-4,5,main]] Thread interrupted [4]
[Thread[RxComputationThreadPool-3,5,main]] Thread interrupted [3]
[Thread[RxComputationThreadPool-3,5,main]] Worker [3]
[Thread[RxComputationThreadPool-4,5,main]] Worker [4]
First worker [Worker [2]]