我正在研究Multithreaded code
,从中我试图测量特定方法所花费的时间,因为我正在尝试对我们的大多数队友代码进行基准测试,因为我正在对我们的Client code
进行Load and Performance
测试,然后是我们的Service code
。
所以对于这个性能测量,我使用-
System.nanoTime();
我有多线程代码,我从中产生多个线程,并试图衡量代码占用多少时间。
下面是我试图衡量任何代码性能的示例示例-在下面的代码中,我试图衡量-
beClient.getAttributes method
public class BenchMarkTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
try {
for (int i = 0; i < 3 * 5; i++) {
executor.submit(new ThreadTask(i));
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
}
}
}
下面是实现Runnable接口的类
class ThreadTask implements Runnable {
private int id;
public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();
public ThreadTask(int id) {
this.id = id;
}
@Override
public void run() {
long start = System.nanoTime();
attributes = beClient.getAttributes(columnsList);
long end = System.nanoTime() - start;
final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
if (before != null) {
before.incrementAndGet();
}
}
}
无论我想测量什么代码,我通常把下面一行放在方法的上方
long start = System.nanoTime();
方法相同但ConcurrentHashMap
long end = System.nanoTime() - start;
final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
if (before != null) {
before.incrementAndGet();
}
今天我和我的一个高级同事开会,他说ConcurrentHashMap
的incrementAndGet
方法是阻塞调用。所以你的线程将在那里等待一段时间。
他让我做Asynchronous call
。
有可能进行异步调用吗?
因为在我们所有的客户端代码和服务代码中测量每个方法的性能,我使用了相同的上面的三行,我通常放在每个方法的前后来测量这些方法的性能。程序完成后,我将把这些映射的结果打印出来。
所以现在我想使Asynchronous call
?有人能帮我做这件事吗?
基本上,我正在尝试以异步方式测量特定方法的性能,以便每个线程不会等待并被阻塞。
我想,我可以用Futures
来做到这一点。有人能举个相关的例子吗?
谢谢你的帮助
一行:
if (before != null) {
before.incrementAndGet();
}
将锁定当前线程,直到before.incrementAndGet()
获得锁(如果您必须知道,实际上没有锁,有while(true)
和比较-交换方法)并返回长值(您没有使用)。
你可以通过在它自己的线程中调用特定的方法来使它异步,这样就不会阻塞当前线程。
要做到这一点,我相信你已经知道如何:使用Thread.start()
, ExecutorService
或FutureTask
(查看"如何在Java中异步调用方法"如何以优雅的方式做到这一点)。
如果我不清楚,这里有一个使用FutureTask
的解决方案:
public class BenchMarkTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
int threadNum = 2;
ExecutorService taskExecutor = Executors.newFixedThreadPool(threadNum);
List<FutureTask<Long>> taskList = new ArrayList<FutureTask<Long>>();
try {
for (int i = 0; i < 3 * 5; i++) {
executor.submit(new ThreadTask(i, taskExecutor, taskList));
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
}
for (FutureTask<Long> futureTask : taskList) {
futureTask.get(); // doing a job similar to joining threads
}
taskExecutor.shutdown();
}
}
ThreadTask
class:
class ThreadTask implements Runnable {
private int id;
public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();
private ExecutorService taskExecutor;
private List<FutureTask<Long>> taskList;
public ThreadTask(int id, ExecutorService taskExecutor, List<FutureTask<Long>> taskList) {
this.id = id;
this.taskExecutor = taskExecutor;
this.taskList = taskList;
}
@Override
public void run() {
long start = System.nanoTime();
attributes = beClient.getAttributes(columnsList);
long end = System.nanoTime() - start;
final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
if (before != null) {
FutureTask<Long> futureTask = new FutureTask<Long>(new Callable<Long>() {
public Long call() {
return before.incrementAndGet();
}
});
taskList.add(futureTask);
taskExecutor.execute(futureTask);
}
}
}
更新:
我想到了一个可能的改进:与其让taskExecutor
在ThreadTask
类中执行futureTask
,不如将任务的执行推迟到main
方法的末尾。我的意思是:
删除ThreadTask.run()
:
taskExecutor.execute(futureTask);
并且,在main()
方法中,您有:
for (FutureTask<Long> futureTask : taskList) {
futureTask.get(); // doing a job similar to joining threads
}
taskExecutor.shutdown();
添加任务的执行,从而有:
taskExecutor.invokeAll(taskList);
for (FutureTask<Long> futureTask : taskList) {
futureTask.get(); // doing a job similar to joining threads
}
taskExecutor.shutdown();
(另外,你可以删除ThreadTask
的ExecutorService
字段,因为它将不再使用它。)
这样,在执行基准测试时开销非常小(开销是向taskList
添加一个对象,而不是其他)。完整更新的代码:
public class BenchMarkTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<FutureTask<Long>> taskList = new ArrayList<FutureTask<Long>>();
try {
for (int i = 0; i < 3 * 5; i++) {
executor.submit(new ThreadTask(i, taskList));
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {
}
int threadNum = 2;
ExecutorService taskExecutor = Executors.newFixedThreadPool(threadNum);
taskExecutor.invokeAll(taskList);
for (FutureTask<Long> futureTask : taskList) {
futureTask.get(); // doing a job similar to joining threads
}
taskExecutor.shutdown();
}
}
class ThreadTask implements Runnable {
private int id;
public static ConcurrentHashMap<Long, AtomicLong> selectHistogram = new ConcurrentHashMap<Long, AtomicLong>();
private List<FutureTask<Long>> taskList;
public ThreadTask(int id, List<FutureTask<Long>> taskList) {
this.id = id;
this.taskList = taskList;
}
@Override
public void run() {
long start = System.nanoTime();
attributes = beClient.getAttributes(columnsList);
long end = System.nanoTime() - start;
final AtomicLong before = selectHistogram.putIfAbsent(end / 1000000L, new AtomicLong(1L));
if (before != null) {
FutureTask<Long> futureTask = new FutureTask<Long>(new Callable<Long>() {
public Long call() {
return before.incrementAndGet();
}
});
taskList.add(futureTask);
}
}
}
我确信增加AtomicLong
比创建Runnable/Callable
对象并将其传递给ExecutorService所需的时间更少。这真的是你的瓶颈吗?如果您真的想要快速并发增量,请参阅JDK8中的LongAdder。
LongAdders可以与ConcurrentHashMap一起使用来维护一个可伸缩的频率映射(一种直方图或multiset的形式)。例如,要向ConcurrentHashMap频率添加一个计数,初始化尚未存在的频率,您可以使用freqs. computeifabsent (k -> new LongAdder()).increment();