在executor框架中同步资源



我正在使用executor框架来执行一项大型任务。出于流程状态的目的,我需要统计完成了多少。因此,我创建了一个带有计数器的singleton类来保持计数。

public class ProgramInitializationTracker {
    private static Map<String, Integer> programInitializedTracker = new HashMap<>();
    private static ProgramInitializationTracker instance;
    private ProgramInitializationTracker(){
    }
    public static ProgramInitializationTracker getInstance(){
        if(instance == null){
            synchronized (ProgramInitializationTracker.class) {
                if(instance == null){
                    instance = new ProgramInitializationTracker();
                }
            }
        }
        return instance;
    }
    public Integer getProgramInitializedTracker(String key) {
        return programInitializedTracker.get(key);
    }
    public void setProgramInitializedTracker(String key, int value) {
        synchronized (ProgramInitializationTracker.class) {
            ProgramInitializationTracker.programInitializedTracker.put(key, value);
        }
    }
}

但问题是,仅仅通过同步设置的方法并不能真正确保我有正确的计数值。就我所能得到的多线程。让get函数同步对我有帮助。如果没有,我应该做些什么来纠正它。

当Java已经为您提供了对集合的线程安全访问时,您不应该尝试实现自己的线程安全的访问。

您应该使用ConcurrentHashMap。诸如get之类的读取不会阻塞。

但是,与其使用Integer类型作为存储在映射中的值,不如使用AtomicInteger,这将确保多个试图修改与同一个键关联的值的线程是线程安全的。

在您发布的约束下,只需在提交给ExecutorService的任务和您想要拥有度量的地方之间共享AtomicInteger的实例。variant1用于拥有覆盖所有任务的单个计数器,variant2用于拥有每个任务类型的计数器。这个代码是线程安全的。

@ThreadSafe
class Test {
    private static class CountingRunnable implements Runnable {
        @Nonnull
        private final Runnable actualTask;
        @Nonnull
        private final AtomicInteger submitted;
        public CountingRunnable(@Nonnull Runnable actualTask, @Nonnull AtomicInteger submitted) {
            this.actualTask = actualTask;
            this.submitted = submitted;
        }
        @Override
        public void run() {
            actualTask.run();
            submitted.incrementAndGet();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        variant2();
    }
    private static void variant1() throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(2);
        AtomicInteger counter = new AtomicInteger();
        final CountDownLatch latch = new CountDownLatch(1);
        service.submit(new CountingRunnable(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                } catch (InterruptedException e) {}
            }
        }, counter));
        latch.await();
        System.out.println(counter.get());
        service.shutdown();
    }
    private enum TaskType {
        TYPE_1,
        TYPE_2
    }
    private static void variant2() throws InterruptedException {
        ExecutorService service = Executors.newFixedThreadPool(2);
        final CountDownLatch latch = new CountDownLatch(2);
        final EnumMap<TaskType, AtomicInteger> metrics = new EnumMap<>(TaskType.class);
        metrics.put(TaskType.TYPE_1, new AtomicInteger());
        metrics.put(TaskType.TYPE_2, new AtomicInteger());
        service.submit(new CountingRunnable(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, metrics.get(TaskType.TYPE_1)));
        service.submit(new CountingRunnable(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    latch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, metrics.get(TaskType.TYPE_2)));
        latch.await();
        System.out.println("type 1: " + metrics.get(TaskType.TYPE_1));
        System.out.println("type 2: " + metrics.get(TaskType.TYPE_2));
        service.shutdown();
    }
}

相关内容

  • 没有找到相关文章

最新更新