我有一堆工作线程,我想在给定延迟后按顺序执行。我想实现以下行为:
延迟 -> 工人 1 -> 延迟 -> 工人 2 - 延迟 -> 工人 3 -> ...
我想出了这个解决方案:
long delay = 5;
for(String value : values) {
WorkerThread workerThread = new WorkerThread(value);
executorService.schedule(workerThread, delay, TimeUnit.SECONDS);
delay = delay + 5;
}
创建executorService
的位置如下:
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
有没有其他方法可以在 Java 中使用ExecutorService
来实现这一点?
看着你的问题,我想出了另一个解决方案。假设值是一个可以更改的队列。这是一个有效的解决方案。我稍微修改了一下你的 WorkerThread,并在其中添加了一个回调对象。希望这有帮助。
private final Queue<String> values = new LinkedList<>();
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private void start() {
AtomicLong delay = new AtomicLong(5);
String value = values.poll();
if (value != null) {
WorkerThread workerThread = new WorkerThread(value, new OnCompleteCallback() {
@Override
public void complete() {
String valueToProcessNext = values.poll();
if (valueToProcessNext != null) {
executorService.schedule(new WorkerThread(valueToProcessNext, this), delay.addAndGet(5), TimeUnit.SECONDS);
}
}
});
executorService.schedule(workerThread, delay.get(), TimeUnit.SECONDS);
}
}
class WorkerThread implements Runnable {
private final String value;
private final OnCompleteCallback callback;
WorkerThread(String value, OnCompleteCallback callback) {
this.value = value;
this.callback = callback;
}
@Override
public void run() {
try {
System.out.println(value);
} finally {
callback.complete();
}
}
}
interface OnCompleteCallback {
void complete();
}
除了您的解决方案之外,什么都不会想到,如果应该使用ExecutorService
。但是,您可能会发现CompletableFuture
更有用的原因,它提供了类似的行为,但相对于任务完成而不是计划开始具有延迟。
CompletableFuture<Void> completableFuture = CompletableFuture.completedFuture(null);
String[] values = new String[]{"a", "b", "c"};
for (String value : values) {
completableFuture
.thenRun(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
})
.thenRun(() -> System.out.println(value));
}
completableFuture.get();
您可以在每个工作人员之间使用DelayQueue
。并用这个类装饰你的工人:
public class DelayedTask implements Runnable {
private final Runnable task;
private final DelayQueue<Delayed> waitQueue;
private final DelayQueue<Delayed> followerQueue;
public DelayedTask(Runnable task, DelayQueue<Delayed> waitQueue, DelayQueue<Delayed> followerQueue) {
this.task = Objects.requireNonNull(task);
this.waitQueue = Objects.requireNonNull(waitQueue);
this.followerQueue = followerQueue;
}
@Override
public void run() {
try {
waitQueue.take();
try {
task.run();
} finally {
if (followerQueue != null) {
followerQueue.add(new Delay(3, TimeUnit.SECONDS));
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
和一个简单的延迟实现
class Delay implements Delayed {
private final long nanos;
Delay(long amount, TimeUnit unit) {
this.nanos = TimeUnit.NANOSECONDS.convert(amount, unit) + System.nanoTime();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(nanos - System.nanoTime(), TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
return Long.compare(nanos, other.getDelay(TimeUnit.NANOSECONDS));
}
}
允许此用法:
ExecutorService executorService = Executors.newFixedThreadPool(1);
// ....
DelayQueue<Delayed> currentQueue = new DelayQueue<>();
currentQueue.add(new Delay(3, TimeUnit.SECONDS));
for (String value : values) {
DelayedTask delayedTask = new DelayedTask(new WorkerThread(value), currentQueue, currentQueue = new DelayQueue<>());
executorService.submit(delayedTask);
}