Java ExecutorService:- 通知线程在事件发生时唤醒



>我有一个管理器类,多个线程将自己注册到该类(UUID用于为每个请求生成唯一标识符),提供有效负载来处理并从管理器获取相应的响应。我正在使用java.util.concurrent.ExecutorService来启动多个线程。这是测试我的管理器功能的实现-

public class ManagerTest {
public static void main(String[] args) {
try {
Manager myManager = new Manager();
// Start listening to the messages from different threads
myManager.consumeMessages();
int num_threads = Integer.parseInt(args[0]);
ExecutorService executor = Executors.newFixedThreadPool(num_threads);
for (int i = 0; i < num_threads; i++) {
// class implementation is given below
Runnable worker = new MyRunnable(myManager);
executor.execute(worker);
}
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
}
System.out.println("nFinished all threads");
myManager.closeConnection();
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}

这是MyRunnable类的实现

class MyRunnable implements Runnable {
private Manager managerObj;
public MyRunnable(Manager managerObj) {
this.managerObj = managerObj;
}
@Override
public void run() {     
try {
Random rand = new Random();
int  n = rand.nextInt(35);
String requestId = UUID.randomUUID().toString();
managerObj.registerRequest(requestId, n);
managerObj.publishMessage(requestId);
// Want to avoid this while loop
while( ! managerObj.getRequestStatus(requestId)){
}
int response = managerObj.getRequestResponse(requestId);
// do something else
managerObj.unregisterRequest(requestId);
} catch (IOException e) {
e.printStackTrace();
}
}
}

管理器将处理请求,根据有效负载,请求的响应可能需要不同的时间。每当管理器获得响应时,它都会通过调用此函数setRequestStatus(requestId)将请求状态设置为 true。在此之后,线程将从while loop退出并继续执行。

代码如果工作正常,但线程正在通过不断循环 while 循环直到满足条件来执行太多工作。

它们是在向管理器发送请求后使线程休眠的一种方式,管理器在响应准备就绪时会向该线程发出唤醒信号。

如果这对某人来说听起来太简单了,请原谅我,我是 java 和 java 线程接口的新手。

没关系,我们习惯了简单的问题,你的问题其实写得很好,有一个合理的问题要解决,我们每天都在这里看到更糟糕的情况,比如人们不知道他们想要什么,如何要求等等。

所以,你正在做的是一个繁忙的旋转循环,这是一件非常糟糕的事情,因为 a) 它每个线程消耗一个完整的 CPU 内核,b) 它实际上使 CPU 保持繁忙,这意味着它正在从其他线程窃取处理时间,这些线程可能有有用的工作要做。

有很多方法可以解决这个问题,我将从最差到最好列出它们。

  • 改进代码的最简单方法是调用java.lang.Thread.sleep(long millis)方法,将其作为参数传递0。这也称为"yield"操作,它本质上意味着"如果还有其他线程有一些有用的工作要做,让它们运行,并在完成后返回给我。 这仅比繁忙旋转循环略好,因为它仍然会消耗 100% 的 CPU。好处是它只会消耗CPU,而其他线程没有任何关系,所以至少它不会减慢其他事情的速度。

  • 改进代码的下一个最佳但仍然不是很聪明的方法是调用java.lang.Thread.sleep(long millis)方法将其作为参数传递1。 这被称为"传递"操作,它本质上意味着"将我的时间片的剩余部分释放给任何其他可能有一些有用工作要做的线程"。 换句话说,即使不需要在整个系统中完成有用的工作,时间片的剩余部分也会被没收。 这将使 CPU 消耗降至几乎为零。 缺点是 a) CPU 消耗实际上略高于零,b) 机器将无法进入某些低功耗睡眠模式,以及 c) 您的工作线程响应速度会稍微慢一些:它将拾取仅在时间片边界上执行的工作。

  • 解决问题的最佳方法是使用 java 中内置的同步机制,如以下答案所述: https://stackoverflow.com/a/5999146/773113 这不仅会消耗零 CPU,甚至会允许机器在等待时进入低功耗模式。

  • 为了解决最一般情况下的问题,您不想只是等到一个条件,而是实际上也传递有关已完成的工作或要完成的工作的信息,您将使用BlockingQueue。 (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingQueue.html) 阻塞队列使用 java 内置同步机制工作,并允许一个线程将信息传递给另一个线程。

@MikeNakis已经有一个很好的答案,但我也倾向于提供另一种选择。

但是,此选项涉及更改管理器 API 以返回未来

我建议对经理的更改如下:

  • 删除getRequestStatus()方法
  • getRequestResponse()返回Future<Integer>

有了这些更改,MyRunnablerun()方法可以更改为:

public void run() {     
try {
Random rand = new Random();
int  n = rand.nextInt(35);
String requestId = UUID.randomUUID().toString();
managerObj.registerRequest(requestId, n);
managerObj.publishMessage(requestId);
// Future.get() blocks and waits for the result without consuming CPU
int response = managerObj.getRequestResponse(requestId).get();
// do something else
managerObj.unregisterRequest(requestId);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

实现Future<>的最简单方法可能是使用 Java 的java.util.concurrent.FutureTask

最新更新