AtomicLong的incrementAndGet方法阻塞调用



我正在研究Multithreaded code,从中我试图测量特定方法所花费的时间,因为我正在尝试对我们的大多数队友代码进行基准测试,因为我正在对我们的Client code进行Load and Performance测试,然后是我们的Service code

所以对于这个性能测量,我使用-

System.nanoTime();

我有多线程代码,我从中产生多个线程,并试图衡量代码占用多少时间。

下面是我试图衡量任何代码性能的示例示例-在下面的代码中,我试图衡量-

beClient.getAttributes method

下面是代码-
public class BenchMarkTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        try {
            for (int i = 0; i < 3 * 5; i++) {
                executor.submit(new ThreadTask(i));
            }
            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
        }
    }
}
下面是实现Runnable接口的类
class ThreadTask implements Runnable {
    private int id;
    public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();

    public ThreadTask(int id) {
        this.id = id;
    }
    @Override
    public void run() {

        long start = System.nanoTime();
        attributes = beClient.getAttributes(columnsList);
        long end = System.nanoTime() - start;
        final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
        if (before != null) {
            before.incrementAndGet();
        }
    }
}

无论我想测量什么代码,我通常把下面一行放在方法的上方

long start = System.nanoTime();

方法相同但ConcurrentHashMap

后面这两行
long end = System.nanoTime() - start;
final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
        if (before != null) {
            before.incrementAndGet();
        }

今天我和我的一个高级同事开会,他说ConcurrentHashMapincrementAndGet方法是阻塞调用。所以你的线程将在那里等待一段时间。

他让我做Asynchronous call

有可能进行异步调用吗?

因为在我们所有的客户端代码和服务代码中测量每个方法的性能,我使用了相同的上面的三行,我通常放在每个方法的前后来测量这些方法的性能。程序完成后,我将把这些映射的结果打印出来。

所以现在我想使Asynchronous call ?有人能帮我做这件事吗?

基本上,我正在尝试以异步方式测量特定方法的性能,以便每个线程不会等待并被阻塞。

我想,我可以用Futures来做到这一点。有人能举个相关的例子吗?

谢谢你的帮助

一行:

if (before != null) {
    before.incrementAndGet();
}

将锁定当前线程,直到before.incrementAndGet()获得锁(如果您必须知道,实际上没有锁,有while(true)和比较-交换方法)并返回长值(您没有使用)。

你可以通过在它自己的线程中调用特定的方法来使它异步,这样就不会阻塞当前线程。

要做到这一点,我相信你已经知道如何:使用Thread.start(), ExecutorServiceFutureTask(查看"如何在Java中异步调用方法"如何以优雅的方式做到这一点)。

如果我不清楚,这里有一个使用FutureTask的解决方案:

public class BenchMarkTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        int threadNum = 2;
        ExecutorService taskExecutor = Executors.newFixedThreadPool(threadNum);
        List<FutureTask<Long>> taskList = new ArrayList<FutureTask<Long>>();
        try {
            for (int i = 0; i < 3 * 5; i++) {
                executor.submit(new ThreadTask(i, taskExecutor, taskList));
            }
            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
        }
        for (FutureTask<Long> futureTask : taskList) {
            futureTask.get(); // doing a job similar to joining threads
        }
        taskExecutor.shutdown();
    }
}

ThreadTask class:

class ThreadTask implements Runnable {
    private int id;
    public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();
    private ExecutorService taskExecutor;
    private List<FutureTask<Long>> taskList;    
    public ThreadTask(int id, ExecutorService taskExecutor, List<FutureTask<Long>> taskList) {
        this.id = id;
        this.taskExecutor = taskExecutor;
        this.taskList = taskList;
    }
    @Override
    public void run() {

        long start = System.nanoTime();
        attributes = beClient.getAttributes(columnsList);
        long end = System.nanoTime() - start;
        final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
        if (before != null) {
            FutureTask<Long> futureTask = new FutureTask<Long>(new Callable<Long>() {
                public Long call() {
                    return before.incrementAndGet();
                }
            });
            taskList.add(futureTask);
            taskExecutor.execute(futureTask);
        }
    }
}

更新:

我想到了一个可能的改进:与其让taskExecutorThreadTask类中执行futureTask,不如将任务的执行推迟到main方法的末尾。我的意思是:

删除ThreadTask.run():

            taskExecutor.execute(futureTask);

并且,在main()方法中,您有:

        for (FutureTask<Long> futureTask : taskList) {
            futureTask.get(); // doing a job similar to joining threads
        }
        taskExecutor.shutdown();

添加任务的执行,从而有:

        taskExecutor.invokeAll(taskList);
        for (FutureTask<Long> futureTask : taskList) {
            futureTask.get(); // doing a job similar to joining threads
        }
        taskExecutor.shutdown();

(另外,你可以删除ThreadTaskExecutorService字段,因为它将不再使用它。)

这样,在执行基准测试时开销非常小(开销是向taskList添加一个对象,而不是其他)。

完整更新的代码:

public class BenchMarkTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        List<FutureTask<Long>> taskList = new ArrayList<FutureTask<Long>>();
        try {
            for (int i = 0; i < 3 * 5; i++) {
                executor.submit(new ThreadTask(i, taskList));
            }
            executor.shutdown();
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
        } catch (InterruptedException e) {
        }
        int threadNum = 2;
        ExecutorService taskExecutor = Executors.newFixedThreadPool(threadNum);
        taskExecutor.invokeAll(taskList);
        for (FutureTask<Long> futureTask : taskList) {
            futureTask.get(); // doing a job similar to joining threads
        }
        taskExecutor.shutdown();
    }
}

class ThreadTask implements Runnable {
    private int id;
    public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();
    private List<FutureTask<Long>> taskList;    
    public ThreadTask(int id, List<FutureTask<Long>> taskList) {
        this.id = id;
        this.taskList = taskList;
    }
    @Override
    public void run() {

        long start = System.nanoTime();
        attributes = beClient.getAttributes(columnsList);
        long end = System.nanoTime() - start;
        final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
        if (before != null) {
            FutureTask<Long> futureTask = new FutureTask<Long>(new Callable<Long>() {
                public Long call() {
                    return before.incrementAndGet();
                }
            });
            taskList.add(futureTask);
        }
    }
}

我确信增加AtomicLong比创建Runnable/Callable对象并将其传递给ExecutorService所需的时间更少。这真的是你的瓶颈吗?如果您真的想要快速并发增量,请参阅JDK8中的LongAdder。

LongAdders可以与ConcurrentHashMap一起使用来维护一个可伸缩的频率映射(一种直方图或multiset的形式)。例如,要向ConcurrentHashMap频率添加一个计数,初始化尚未存在的频率,您可以使用freqs. computeifabsent (k -> new LongAdder()).increment();

相关内容

  • 没有找到相关文章

最新更新