仅执行程序/队列处理最后一个已知任务



我希望编写一些将处理事件的并发代码。此处理可能需要很长时间。

当该事件正在处理时,它应该记录传入的事件,然后在它可以再次自由运行时处理最后一个传入的事件。(其他事件可以扔掉)。这有点像 FILO 队列,但我只需要在队列中存储一个元素。

理想情况下,我想将我的新执行器插入到如下所示的事件处理架构中。

public class AsyncNode<I, O> extends AbstractNode<I, O>  {
    private static final Logger log = LoggerFactory.getLogger(AsyncNode.class);
    private Executor executor;
    public AsyncNode(EventHandler<I, O> handler, Executor executor) {
        super(handler);
        this.executor = executor;
    }
    @Override
    public void emit(O output) {
        if (output != null) {
            for (EventListener<O> node : children) {
                node.handle(output);
            }
        }
    }
    @Override
    public void handle(final I input) {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try{
                emit(handler.process(input));
                }catch (Exception e){
                    log.error("Exception occured whilst processing input." ,e);
                    throw e;
                }
            }
        });
    }

}

我也不会这样做。我会对您要处理的事件有一个原子引用,并添加一个任务以破坏性方式处理它。

final AtomicReference<Event> eventRef =
public void processEvent(Event event) {
   eventRef.set(event);
   executor.submit(new Runnable() {
       public vodi run() {
           Event e = eventRef.getAndSet(null);
           if (e == null) return;
           // process event
       }
   }
}

这只会在执行程序空闲时处理下一个事件,而不自定义执行程序或队列(可用于其他事情)

这也扩展到具有键控事件,即您希望处理键的最后一个事件。

我认为关键是您需要应用于Executor的"丢弃策略"。如果您只想处理最新任务,则需要队列大小为 1 和丢弃最旧任务的"丢弃策略"。下面是一个将执行此操作的执行程序的示例

Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded 
        30L, TimeUnit.SECONDS, // Keep alive, not really important here
        new ArrayBlockingQueue<>(1), // Single element queue
        new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest

然后,当您的任务进来时,只需将它们提交给此执行器,如果已经有一个排队的作业,它将被替换为新的作业

latestTaskExecutor.execute(() -> doUpdate()));

这是一个示例应用程序,显示此功能

import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class LatestUpdate {
    private static final Executor latestTaskExecutor = new ThreadPoolExecutor(1, 1, // Single threaded
            30L, TimeUnit.SECONDS, // Keep alive, not really important here
            new ArrayBlockingQueue<>(1), // Single element queue
            new ThreadPoolExecutor.DiscardOldestPolicy()); // When new work is submitted discard oldest
    private static final AtomicInteger counter = new AtomicInteger(0);
    private static final Random random = new Random(); 
    public static void main(String[] args) {
        LatestUpdate latestUpdate = new LatestUpdate();
        latestUpdate.run();
    }
    private void doUpdate(int number) {
        System.out.println("Latest number updated is: " + number);
        try { // Wait a random amount of time up to 5 seconds. Processing the update takes time...
            Thread.sleep(random.nextInt(5000));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private void run() {
        // Updates a counter every second and schedules an update event
        Thread counterUpdater = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(1000L); // Wait one second
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                counter.incrementAndGet();
                // Schedule this update will replace any existing update waiting 
                latestTaskExecutor.execute(() -> doUpdate(counter.get()));
                System.out.println("New number is: " + counter.get());
            }
        });
        counterUpdater.start(); // Run the thread
    }
}

这也涵盖了 GUI 的情况,一旦更新停止到达,您希望 GUI 最终与收到的最后一个事件保持一致。

public class LatestTaskExecutor implements Executor {
    private final AtomicReference<Runnable> lastTask =new AtomicReference<>();
    private final Executor executor;
    public LatestTaskExecutor(Executor executor) {
        super();
        this.executor = executor;
    }
    @Override
    public void execute(Runnable command) {
        lastTask.set(command);
        executor.execute(new Runnable() {
            @Override
            public void run() {
                Runnable task=lastTask.getAndSet(null);
                if(task!=null){
                    task.run();
                }
            }
        });
    }
}
@RunWith( MockitoJUnitRunner.class )
public class LatestTaskExecutorTest {
    @Mock private Executor executor;
    private LatestTaskExecutor latestExecutor;
    @Before
    public void setup(){
        latestExecutor=new LatestTaskExecutor(executor);
    }
    @Test
    public void testRunSingleTask() {
        Runnable run=mock(Runnable.class);
        latestExecutor.execute(run);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor).execute(captor.capture());
        captor.getValue().run();
        verify(run).run();
    }
    @Test
    public void discardsIntermediateUpdates(){
        Runnable run=mock(Runnable.class);
        Runnable run2=mock(Runnable.class);
        latestExecutor.execute(run);
        latestExecutor.execute(run2);
        ArgumentCaptor<Runnable> captor=ArgumentCaptor.forClass(Runnable.class);
        verify(executor,times(2)).execute(captor.capture());
        for (Runnable runnable:captor.getAllValues()){
            runnable.run();
        }
        verify(run2).run();
        verifyNoMoreInteractions(run);
    }
}

这个答案是DD的修改版本,它最大限度地减少了多余任务的提交。

原子引用用于跟踪最新事件。将自定义任务提交到队列以潜在地处理事件,只有读取最新事件的任务才能真正继续并在清除对 null 的原子引用之前执行有用的工作。当其他任务有机会运行并且发现没有事件可供处理时,它们什么都不做,默默地消失了。通过跟踪队列中可用任务的数量,可以避免提交多余的任务。如果队列中至少有一个任务待处理,我们可以避免提交该任务,因为当已经排队的任务取消排队时,将处理该事件。

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class EventExecutorService implements Executor {
    private final Executor executor;
    // the field which keeps track of the latest available event to process
    private final AtomicReference<Runnable> latestEventReference = new AtomicReference<>();
    private final AtomicInteger activeTaskCount = new AtomicInteger(0);
    public EventExecutorService(final Executor executor) {
        this.executor = executor;
    }
    @Override
    public void execute(final Runnable eventTask) {
        // update the latest event
        latestEventReference.set(eventTask);
        // read count _after_ updating event
        final int activeTasks = activeTaskCount.get();
        if (activeTasks == 0) {
            // there is definitely no other task to process this event, create a new task
            final Runnable customTask = new Runnable() {
                @Override
                public void run() {
                    // decrement the count for available tasks _before_ reading event
                    activeTaskCount.decrementAndGet();
                    // find the latest available event to process
                    final Runnable currentTask = latestEventReference.getAndSet(null);
                    if (currentTask != null) {
                        // if such an event exists, process it
                        currentTask.run();
                    } else {
                        // somebody stole away the latest event. Do nothing.
                    }
                }
            };
            // increment tasks count _before_ submitting task
            activeTaskCount.incrementAndGet();
            // submit the new task to the queue for processing
            executor.execute(customTask);
        }
    }
}

虽然我喜欢 James Mudd 的解决方案,但它仍然会在上一个任务运行时排队第二个任务,这可能是不可取的。如果你想在上一个任务没有完成的情况下总是忽略/丢弃到达任务,你可以像这样做一些包装器:

public class DiscardingSubmitter {
private final ExecutorService es = Executors.newSingleThreadExecutor();
private Future<?> future = CompletableFuture.completedFuture(null); //to avoid null check
public void submit(Runnable r){
    if (future.isDone()) {
        future = es.submit(r);
    }else {
        //Task skipped, log if you want
    }
}

}

相关内容

  • 没有找到相关文章

最新更新