不断将可运行的任务提交到 ExecutorService,直到工作完成,获取 java.util.concurrent.



我的多线程类应该对类ClassA的许多对象执行三个操作 -operation1operation2operation3,其中每种类型的操作都依赖于前面的操作。为此,我尝试使用许多BlockingQueueExecutorService来实现生产者-消费者模式。

final ExecutorService executor = ForkJoinPool.commonPool();
final BlockingQueue<ClassA> operationOneQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationTwoQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> operationThreeQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);
final BlockingQueue<ClassA> resultQueue = new ArrayBlockingQueue<>(NO_OF_CLASS_A_OBJECTS);

操作实现方式如下:

void doOperationOne() throws InterruptedException {
ClassA objectA = operationOneQueue.take();
objectA.operationOne();
operationTwoQueue.put(objectA);
}

其中每种类型的操作都有自己相应的方法,具有"自己的"队列内和队列外。每个操作方法都对ClassA对象调用相应的方法。该方法doOperationThreeClassA对象放入resultQueue中,这意味着它们已被完全处理。

首先,我用所有要操作ClassA对象填充operationOneQueue。然后,我尝试将可执行任务分配给ExecutorService,如下所示:

while (resultQueue.size() < NO_OF_CLASS_A_OBJECTS) {
executor.execute(() -> {
try {
doOperationOne();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
doOperationTwo();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
executor.execute(() -> {
try {
doOperationThree();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();

运行我的程序,我得到一个java.util.concurrent.RejectedExecutionException.

Operation1: ClassA object 0
Operation2: ClassA object 0
Operation1: ClassA object 1
Operation3: ClassA object 0
....
Operation1: ClassA object 46
Operation2: ClassA object 45
Operation3: ClassA object 45
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Queue capacity exceeded
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.growArray(ForkJoinPool.java:912)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.lockedPush(ForkJoinPool.java:867)
at java.base/java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1911)
at java.base/java.util.concurrent.ForkJoinPool.externalSubmit(ForkJoinPool.java:1930)
at java.base/java.util.concurrent.ForkJoinPool.execute(ForkJoinPool.java:2462)
at concurrent.operations.Program1.main(Program1.java:96)

我做错了什么?如何在不使线程池过度饱和的情况下实现这一点?

编辑:完全披露 - 这是有一些要求的家庭作业。1. 我必须使用ForkJoinPool.commonPool(),不能自己设置线程数, 2.我必须使用消费者-生产者模式,以及 3.我一定不能修改ClassA.

我真的很喜欢做并发的东西,所以我确实尝试过写它。我确实使用了 a( 默认情况下在ForkJoinPool.commonPool中运行的CompletableFuture,b( 使实际处理变得非常简单:

while (true) {
final ClassA nextOperation = queue.take();
CompletableFuture.runAsync(nextOperation::operationOne)
.thenRun(nextOperation::operationTwo)
.thenRun(nextOperation::operationThree)
.thenRun(() -> resultQueue.add(nextOperation));
}

这将从队列中获取ClassA对象,并按顺序同时执行其所有操作。

您确实遗漏了任务的来源,以及是否需要使用者终止。通常你不想这样做,这确实使事情变得更加复杂。

private static final int COUNT = 10;
private static final Random RANDOM = new Random();
public static void main(String[] args) throws ExecutionException, InterruptedException {
BlockingQueue<ClassA> runnables = new ArrayBlockingQueue<>(COUNT);
BlockingQueue<ClassA> finished = new ArrayBlockingQueue<>(COUNT);
// start producer
ExecutorService createTaskExecutor = Executors.newSingleThreadExecutor();
createTaskExecutor.submit(() -> fillQueue(runnables));
// wait for all consumer tasks to finish
while (finished.size() != COUNT) {
try {
// we need to poll instead of waiting forever
// because the last tasks might still be running
// while there are no others to add anymore
// so we need to check again if all have finished in the meantime
final ClassA nextOperation = runnables.poll(2, TimeUnit.SECONDS);
if (nextOperation != null) {
CompletableFuture.runAsync(nextOperation::operationOne)
.thenRun(nextOperation::operationTwo)
.thenRun(nextOperation::operationThree)
.thenRun(() -> finished.add(nextOperation));
}
} catch (InterruptedException e) {
System.err.println("exception while retrieving next operation");
// we will actually need to terminate now, or probably never will
throw e;
}
}
System.out.printf("finished tasks (%d):%n", finished.size());
for (ClassA classA : finished) {
System.out.printf("finished task %d%n", classA.designator);
}
createTaskExecutor.shutdown();
}
private static void fillQueue(BlockingQueue<ClassA> runnables) {
// start thread filling the queue at random
for (int i = 0; i < COUNT; i++) {
runnables.add(new ClassA(i));
try {
Thread.sleep(RANDOM.nextInt(1_000));
} catch (InterruptedException e) {
System.err.println("failed to add runnable");
}
}
}

由于您没有提供ClassA,我使用了这个。它包含一个标识符,因此您可以跟踪哪个标识符在什么时间运行。

class ClassA {
private static final Random RANDOM = new Random();
public final int designator;
public ClassA(int i) {
designator = i;
}
public void operationOne() {
System.out.printf("%d: operation 1%n", designator);
sleep();
}
public void operationTwo() {
System.out.printf("%d: operation 2%n", designator);
sleep();
}
public void operationThree() {
System.out.printf("%d: operation 3%n", designator);
sleep();
}
private static void sleep() {
try {
Thread.sleep(RANDOM.nextInt(5_000));
} catch (InterruptedException e) {
System.err.println("interrupted while executing task");
}
}
}

最新更新