背景
构建一个数据管道,异步处理接收到的每个消息。尝试通过模拟行为
- 正在从文件中读取消息
- 使用CompletableFuture进行处理
代码
BufferedReader reader = null;
ExecutorService service = Executors.newFixedThreadPool(4);
try {
String filepath = str[0];
FileReaderAsync fileReaderAsync = new FileReaderAsync();
reader = new BufferedReader(new FileReader(filepath));
Random r = new Random();
String line;
while ((line = reader.readLine()) != null) {
Integer val = Integer.valueOf(line.trim());
int randomInt = r.nextInt(5);
Thread.sleep(randomInt * 100);
CompletableFuture.supplyAsync(() -> {
System.out.println("Square : " + val);
return val * val;
}, service)
.thenApplyAsync(value -> {
System.out.println(":::::::Double : " + value);
return 2 * value;
}, service)
.thenAccept(value -> {
System.out.println("Answer : " + value);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
reader.close();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
为了简单起见,只粘贴主方法代码,假设变量已声明并在作用域中。
问题
代码
- 程序运行良好,但没有退出,尝试注释异步逻辑并读取文件。它运行良好,也结束了
设计
- 在流式处理管道中,如果每个消息都被传递给CompletableFuture进行处理,那么这个异步模型会为每个传入消息工作吗
- 或者它将阻止处理当前消息
- 需要引入另一个队列,然后从中消费,而不是在传入消息流入时消费
编辑1添加
public void shutdown() {
service.shutdown();
}
和
reader.close();
fileReaderAsync.shutdown();
成功了。
问题
您使用的线程池由以下人员创建:
ExecutorService service = Executors.newFixedThreadPool(4);
默认情况下,配置为使用非守护进程线程。正如java.lang.Thread:所记录的那样
当Java虚拟机启动时,通常有一个非守护进程线程(通常调用某个指定类的名为
main
的方法)。Java虚拟机继续执行线程,直到出现以下任一情况:
- 类
Runtime
的exit
方法已被调用,安全管理器已允许进行退出操作- 所有不是守护进程线程的线程都已死亡,原因可能是从对
run
方法的调用返回,也可能是抛出传播到run
方法之外的异常
换句话说,任何仍处于活动状态的非守护进程线程也将使JVM保持活动状态。
解决方案
你的问题至少有两种解决办法。
关闭线程池
你可以在完成线程池后关闭它。
service.shutdown(); // Calls ExecutorService#shutdown()
#shutdown()
方法开始了一次优雅的关机。它阻止提交任何新任务,但允许完成任何已提交的任务。一旦所有任务完成,池将终止(即允许所有线程死亡)。如果要等待所有任务完成后再继续,则可以在调用#shutdown()
/#shutdownNow()
之后再调用#awaitTermination(long,TimeUnit)
。
如果您想尝试立即关闭池,请调用#shutdownNow()
。任何当前正在执行的任务都将被取消,任何提交但尚未启动的任务都不会被执行(实际上会在列表中返回给您)。请注意,任务是否响应取消取决于该任务的实现方式。
使用守护程序线程
守护进程线程不会使JVM保持活动状态。您可以通过ThreadFactory
将线程池配置为使用守护进程线程。
ExecutorService service = Executors.newFixedThreadPool(4, r -> {
Thread t = new Thread(r); // may want to name the threads
t.setDaemon(true);
return t;
});
请注意,不管怎样,在完成线程池之后,您仍然应该关闭它。
池中有4个线程,但Thread.sleep()会阻塞主线程。你的程序读取一行,最多阻塞5秒,然后会触发异步代码,这根本不需要任何异步,实际上会产生巨大的开销。
不要在异步程序中使用Thread.sleep()。
但我试图得到你的代码的想法,我可以提供这个:
public int calcWork(final int x) {
return x*x;
}
public void iter_async_rec(final BufferedReader reader) {
String line = reader.readline();
if (line != null) {
int i = Integer.tryParse(line); // checks required
CompetableFuture.supplyAsync(calcWork(i))
.thenSupplyAsync(i->System.out.println(i))
.thenRunAsync(()->iter_asnc_rec(reader))
}
}
此外:大多数时候,只使用标准执行器是最好的选择。相反,给定的样本不会提高速度。
也许可以看看这个被动的想法!?reactivejava