按顺序处理异步事件和发布结果



我的目标是按顺序发布异步事件,这些事件也按顺序到达并需要任意时间进行处理。所以下面是我目前的实现,仅使用 waitnotify . MyThread处理事件,按 id 将结果放入哈希表中,并在按顺序发布此事件之前通知线程Scheduler如果它被阻止。

使用java.util.concurrent包实现此功能的更好、更简洁的方法是什么?

import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;

public class AsyncHandler {
    private final Map<Integer, Object> locks = new ConcurrentHashMap<Integer, Object>();
    private final Map<Integer, Result> results = new ConcurrentHashMap<Integer, Result>();
    private static final Random rand = new Random();
    public AsyncHandler () {
        new Scheduler(this, locks, results).start();
    }
    public void handleEvent(Event event) {
        System.out.println("handleEvent(" + event.id + ")");
        new MyThread(this, event, locks, results).start();
    }
    public Result processEvent (Event event) {
        System.out.println("processEvent(" + event.id + ")");
        locks.put(event.id, new Object());
        try {
            Thread.sleep(rand.nextInt(10000));
        } catch (InterruptedException e) {
            System.out.println(e);
        }
        return new Result(event.id);
    }
    public void postProcessEvent (Result result) {
        System.out.println(result.id);
    }
    public static void main (String[] args) {
        AsyncHandler async = new AsyncHandler();
        for (int i = 0; i < 100; i++) {
            async.handleEvent(new Event(i));
        }
    }
}
class Event {
    int id;
    public Event (int id) {
        this.id = id;
    }
}
class Result {
    int id;
    public Result (int id) {
        this.id = id;
    }
}
class MyThread extends Thread {
    private final Event event;
    private final Map<Integer, Object> locks;
    private final Map<Integer, Result> results;
    private final AsyncHandler async;
    public MyThread (AsyncHandler async, Event event, Map<Integer, Object> locks, Map<Integer, Result> results) {
        this.async = async;
        this.event = event;
        this.locks = locks;
        this.results = results;
    }
    @Override
    public void run () {
        Result res = async.processEvent(event);
        results.put(event.id, res);
        Object lock = locks.get(event.id);
        synchronized (lock) {
            lock.notifyAll();
        }
    }
}
class Scheduler extends Thread {
    private int curId = 0;
    private final AsyncHandler async;
    private final Map<Integer, Object> locks;
    private final Map<Integer, Result> results;
    public Scheduler (AsyncHandler async, Map<Integer, Object> locks, Map<Integer, Result> results) {
        this.async = async;
        this.locks = locks;
        this.results = results;
    }
    @Override
    public void run () {
        while (true) {
            Result res = results.get(curId);
            if (res == null) {
                Object lock = locks.get(curId);
                //TODO: eliminate busy waiting
                if (lock == null) {
                    continue;
                }
                synchronized (lock) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        System.out.println(e);
                        System.exit(1);
                    }
                }
                res = results.get(curId);
            }
            async.postProcessEvent(res);
            results.remove(curId);
            locks.remove(curId);
            curId++;
        }
    }
}

是的,并发库会简单得多。

ExecutorService 旨在包装线程池和队列,以返回每个任务的 Future 并提供等待结果的任何线程。

如果你想按顺序处理结果

,有一个线程来按顺序处理未来的结果。

按顺序处理异步结果

public class Main {
    public static void main(String[] args) {
        Main main = new Main();
        for (int i = 0; i < 1000; i++) {
            final int finalI = i;
            main.submitTask(new Callable<Long>() {
                @Override
                public Long call() throws Exception {
                    long millis = (long) (Math.pow(2000, Math.random()));
                    Thread.sleep(millis);
                    return millis;
                }
            }, new ResultHandler<Long>() {
                @Override
                public void onFuture(Future<Long> future) throws ExecutionException, InterruptedException {
                    System.out.println(new Date() + ": " + finalI + " - Slept for " + future.get() + " millis");
                }
            });
        }
        main.shutdown();
    }

    public interface ResultHandler<T> {
        void onFuture(Future<T> future) throws Exception;
    }
    private final ExecutorService pool = Executors.newFixedThreadPool(10);
    private final ExecutorService result = Executors.newSingleThreadExecutor();
    public synchronized <T> void submitTask(Callable<T> callable, final ResultHandler<T> resultHandler) {
        final Future<T> future = pool.submit(callable);
        result.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    resultHandler.onFuture(future);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    public void shutdown() {
        pool.shutdown();
        result.shutdown();
    }
}

指纹

Wed Oct 02 16:32:07 CEST 2013: 0 - Slept for 1 millis
Wed Oct 02 16:32:07 CEST 2013: 1 - Slept for 1899 millis
Wed Oct 02 16:32:09 CEST 2013: 2 - Slept for 32 millis
Wed Oct 02 16:32:09 CEST 2013: 3 - Slept for 32 millis
Wed Oct 02 16:32:09 CEST 2013: 4 - Slept for 214 millis
Wed Oct 02 16:32:09 CEST 2013: 5 - Slept for 366 millis
... many deleted ...
Wed Oct 02 16:32:09 CEST 2013: 82 - Slept for 6 millis
Wed Oct 02 16:32:09 CEST 2013: 83 - Slept for 1636 millis
Wed Oct 02 16:32:10 CEST 2013: 84 - Slept for 44 millis
Wed Oct 02 16:32:10 CEST 2013: 85 - Slept for 1 millis

您可以看到,尽管某些任务比其他任务花费的时间长得多,但输出的顺序是任务的添加顺序。 您还可以看到它在同一秒(同时)处理许多任务

或者,

您可以将期货排队,而不是将后处理调度到单个线程执行器。逻辑非常相似;单线程执行器也在内部使用队列,但主要区别在于处理结果对象的方式。使用队列允许在最终处理阶段进行循环(即像AWT事件处理工作一样)。这取决于该部分周围的应用,哪种方式更适合。

import java.util.Random;
import java.util.concurrent.*;
public class InOrder
{
  private static final Random rand = new Random();
  final static class Event implements Callable<Result> {
    final int id;
    public Event (int id) {
        this.id = id;
    }
    public Result call() throws InterruptedException {
      // arbitrary long computation
      Thread.sleep(rand.nextInt(10000));
      return new Result(id);
    }
  }
  final static class Result {
    int id;
    public Result(int id) {
        this.id = id;
    }
  }
  static final int STOP_ID = -1;
  private static final ExecutorService POOL = Executors.newFixedThreadPool(10);
  private static final BlockingQueue<Future<Result>> QUEUE = new ArrayBlockingQueue<>(10);
  static void processResults() throws InterruptedException, ExecutionException {
    for(;;) {
      Result r=QUEUE.take().get();
      if(r.id==STOP_ID) return;
      System.out.println("received result id="+r.id);
    }
  }
  public static void main(String[] args)
  {
    POOL.submit(new Callable<Object>() {
      public Object call() throws Exception {
        processResults();
        return null;
      }
    });
    for(int id=0; id<100; id++) try {
      QUEUE.put(POOL.submit(new Event(id)));
    } catch(InterruptedException ex) { break; }
    try { QUEUE.put(new EndMarker()); }
    catch(InterruptedException ex) {}
    POOL.shutdown();
  }
  static final class EndMarker implements Future<Result> {
    public boolean cancel(boolean mayInterruptIfRunning) {
      return false;
    }
    public boolean isCancelled() {
      return false;
    }
    public boolean isDone() {
      return true;
    }
    public Result get() {
      return new Result(STOP_ID);
    }
    public Result get(long timeout, TimeUnit unit) {
      return get();
    }
  }
}

相关内容

  • 没有找到相关文章