我希望编写一些将处理事件的并发代码。此处理可能需要很长时间。
当该事件正在处理时,它应该记录传入的事件,然后在它可以再次自由运行时处理最后一个传入的事件。(其他事件可以扔掉)。这有点像 FILO 队列,但我只需要在队列中存储一个元素。
理想情况下,我想将我的新执行器插入到如下所示的事件处理架构中。
public class AsyncNode<I, O> extends AbstractNode<I, O> {
private static final Logger log = LoggerFactory.getLogger(AsyncNode.class);
private Executor executor;
public AsyncNode(EventHandler<I, O> handler, Executor executor) {
super(handler);
this.executor = executor;
}
@Override
public void emit(O output) {
if (output != null) {
for (EventListener<O> node : children) {
node.handle(output);
}
}
}
@Override
public void handle(final I input) {
executor.execute(new Runnable() {
@Override
public void run() {
try{
emit(handler.process(input));
}catch (Exception e){
log.error("Exception occured whilst processing input." ,e);
throw e;
}
}
});
}
}
我也不会这样做。我会对您要处理的事件有一个原子引用,并添加一个任务以破坏性方式处理它。
final AtomicReference<Event> eventRef =
public void processEvent(Event event) {
eventRef.set(event);
executor.submit(new Runnable() {
public vodi run() {
Event e = eventRef.getAndSet(null);
if (e == null) return;
// process event
}
}
}
这只会在执行程序空闲时处理下一个事件,而不自定义执行程序或队列(可用于其他事情)
这也扩展到具有键控事件,即您希望处理键的最后一个事件。
我认为关键是您需要应用于Executor
的"丢弃策略"。如果您只想处理最新任务,则需要队列大小为 1 和丢弃最旧任务的"丢弃策略"。下面是一个将执行此操作的执行程序的示例
Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded
30L, TimeUnit.SECONDS, // Keep alive, not really important here
new ArrayBlockingQueue<>(1), // Single element queue
new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest
然后,当您的任务进来时,只需将它们提交给此执行器,如果已经有一个排队的作业,它将被替换为新的作业
latestTaskExecutor.execute(() -> doUpdate()));
这是一个示例应用程序,显示此功能
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class LatestUpdate {
private static final Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded
30L, TimeUnit.SECONDS, // Keep alive, not really important here
new ArrayBlockingQueue<>(1), // Single element queue
new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest
private static final AtomicInteger counter = new AtomicInteger(0);
private static final Random random = new Random();
public static void main(String[] args) {
LatestUpdate latestUpdate = new LatestUpdate();
latestUpdate.run();
}
private void doUpdate(int number) {
System.out.println("Latest number updated is: " + number);
try { // Wait a random amount of time up to 5 seconds. Processing the update takes time...
Thread.sleep(random.nextInt(5000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void run() {
// Updates a counter every second and schedules an update event
Thread counterUpdater = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(1000L); // Wait one second
} catch (InterruptedException e) {
e.printStackTrace();
}
counter.incrementAndGet();
// Schedule this update will replace any existing update waiting
latestTaskExecutor.execute(() -> doUpdate(counter.get()));
System.out.println("New number is: " + counter.get());
}
});
counterUpdater.start(); // Run the thread
}
}
这也涵盖了 GUI 的情况,一旦更新停止到达,您希望 GUI 最终与收到的最后一个事件保持一致。
public class LatestTaskExecutor implements Executor {
private final AtomicReference<Runnable> lastTask =new AtomicReference<>();
private final Executor executor;
public LatestTaskExecutor(Executor executor) {
super();
this.executor = executor;
}
@Override
public void execute(Runnable command) {
lastTask.set(command);
executor.execute(new Runnable() {
@Override
public void run() {
Runnable task=lastTask.getAndSet(null);
if(task!=null){
task.run();
}
}
});
}
}
@RunWith( MockitoJUnitRunner.class )
public class LatestTaskExecutorTest {
@Mock private Executor executor;
private LatestTaskExecutor latestExecutor;
@Before
public void setup(){
latestExecutor=new LatestTaskExecutor(executor);
}
@Test
public void testRunSingleTask() {
Runnable run=mock(Runnable.class);
latestExecutor.execute(run);
ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
verify(executor).execute(captor.capture());
captor.getValue().run();
verify(run).run();
}
@Test
public void discardsIntermediateUpdates(){
Runnable run=mock(Runnable.class);
Runnable run2=mock(Runnable.class);
latestExecutor.execute(run);
latestExecutor.execute(run2);
ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
verify(executor,times(2)).execute(captor.capture());
for (Runnable runnable:captor.getAllValues()){
runnable.run();
}
verify(run2).run();
verifyNoMoreInteractions(run);
}
}
这个答案是DD的修改版本,它最大限度地减少了多余任务的提交。
原子引用用于跟踪最新事件。将自定义任务提交到队列以潜在地处理事件,只有读取最新事件的任务才能真正继续并在清除对 null 的原子引用之前执行有用的工作。当其他任务有机会运行并且发现没有事件可供处理时,它们什么都不做,默默地消失了。通过跟踪队列中可用任务的数量,可以避免提交多余的任务。如果队列中至少有一个任务待处理,我们可以避免提交该任务,因为当已经排队的任务取消排队时,将处理该事件。
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class EventExecutorService implements Executor {
private final Executor executor;
// the field which keeps track of the latest available event to process
private final AtomicReference<Runnable> latestEventReference = new AtomicReference<>();
private final AtomicInteger activeTaskCount = new AtomicInteger(0);
public EventExecutorService(final Executor executor) {
this.executor = executor;
}
@Override
public void execute(final Runnable eventTask) {
// update the latest event
latestEventReference.set(eventTask);
// read count _after_ updating event
final int activeTasks = activeTaskCount.get();
if (activeTasks == 0) {
// there is definitely no other task to process this event, create a new task
final Runnable customTask = new Runnable() {
@Override
public void run() {
// decrement the count for available tasks _before_ reading event
activeTaskCount.decrementAndGet();
// find the latest available event to process
final Runnable currentTask = latestEventReference.getAndSet(null);
if (currentTask != null) {
// if such an event exists, process it
currentTask.run();
} else {
// somebody stole away the latest event. Do nothing.
}
}
};
// increment tasks count _before_ submitting task
activeTaskCount.incrementAndGet();
// submit the new task to the queue for processing
executor.execute(customTask);
}
}
}
虽然我喜欢 James Mudd 的解决方案,但它仍然会在上一个任务运行时排队第二个任务,这可能是不可取的。如果你想在上一个任务没有完成的情况下总是忽略/丢弃到达任务,你可以像这样做一些包装器:
public class DiscardingSubmitter {
private final ExecutorService es = Executors.newSingleThreadExecutor();
private Future<?> future = CompletableFuture.completedFuture(null); //to avoid null check
public void submit(Runnable r){
if (future.isDone()) {
future = es.submit(r);
}else {
//Task skipped, log if you want
}
}
}