CompletableFuture 随着使用 FileReader 进行读取,程序不会退出



背景

构建一个数据管道,异步处理接收到的每个消息。尝试通过模拟行为

  • 正在从文件中读取消息
  • 使用CompletableFuture进行处理

代码

BufferedReader reader = null;
ExecutorService service = Executors.newFixedThreadPool(4);
try {
String filepath = str[0];
FileReaderAsync fileReaderAsync = new FileReaderAsync();
reader = new BufferedReader(new FileReader(filepath));
Random r = new Random();
String line; 
while ((line = reader.readLine()) != null) {
Integer val = Integer.valueOf(line.trim());
int randomInt = r.nextInt(5);
Thread.sleep(randomInt * 100);
CompletableFuture.supplyAsync(() -> {
System.out.println("Square : " + val);
return val * val;
}, service) 
.thenApplyAsync(value -> {
System.out.println(":::::::Double : " + value);
return 2 * value;
}, service)
.thenAccept(value -> {
System.out.println("Answer : " + value);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
reader.close();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}

为了简单起见,只粘贴主方法代码,假设变量已声明并在作用域中。

问题

代码

  • 程序运行良好,但没有退出,尝试注释异步逻辑并读取文件。它运行良好,也结束了

设计

  • 在流式处理管道中,如果每个消息都被传递给CompletableFuture进行处理,那么这个异步模型会为每个传入消息工作吗
  • 或者它将阻止处理当前消息
  • 需要引入另一个队列,然后从中消费,而不是在传入消息流入时消费

编辑1添加

public void shutdown() {
service.shutdown();
}

reader.close();
fileReaderAsync.shutdown();

成功了。

问题

您使用的线程池由以下人员创建:

ExecutorService service = Executors.newFixedThreadPool(4);

默认情况下,配置为使用非守护进程线程。正如java.lang.Thread:所记录的那样

当Java虚拟机启动时,通常有一个非守护进程线程(通常调用某个指定类的名为main的方法)。Java虚拟机继续执行线程,直到出现以下任一情况:

  • Runtimeexit方法已被调用,安全管理器已允许进行退出操作
  • 所有不是守护进程线程的线程都已死亡,原因可能是从对run方法的调用返回,也可能是抛出传播到run方法之外的异常

换句话说,任何仍处于活动状态的非守护进程线程也将使JVM保持活动状态。


解决方案

你的问题至少有两种解决办法。

关闭线程池

你可以在完成线程池后关闭它。

service.shutdown(); // Calls ExecutorService#shutdown()

#shutdown()方法开始了一次优雅的关机。它阻止提交任何新任务,但允许完成任何已提交的任务。一旦所有任务完成,池将终止(即允许所有线程死亡)。如果要等待所有任务完成后再继续,则可以在调用#shutdown()/#shutdownNow()之后再调用#awaitTermination(long,TimeUnit)

如果您想尝试立即关闭池,请调用#shutdownNow()。任何当前正在执行的任务都将被取消,任何提交但尚未启动的任务都不会被执行(实际上会在列表中返回给您)。请注意,任务是否响应取消取决于该任务的实现方式。

使用守护程序线程

守护进程线程不会使JVM保持活动状态。您可以通过ThreadFactory将线程池配置为使用守护进程线程。

ExecutorService service = Executors.newFixedThreadPool(4, r -> {
Thread t = new Thread(r); // may want to name the threads
t.setDaemon(true);
return t;
});

请注意,不管怎样,在完成线程池之后,您仍然应该关闭它。

池中有4个线程,但Thread.sleep()会阻塞主线程。你的程序读取一行,最多阻塞5秒,然后会触发异步代码,这根本不需要任何异步,实际上会产生巨大的开销。

不要在异步程序中使用Thread.sleep()。

但我试图得到你的代码的想法,我可以提供这个:

public int calcWork(final int x) {
return x*x;
}
public void iter_async_rec(final BufferedReader reader) {
String line = reader.readline();
if (line != null) {     
int i = Integer.tryParse(line); // checks required       
CompetableFuture.supplyAsync(calcWork(i))
.thenSupplyAsync(i->System.out.println(i))
.thenRunAsync(()->iter_asnc_rec(reader))
}
}

此外:大多数时候,只使用标准执行器是最好的选择。相反,给定的样本不会提高速度。

也许可以看看这个被动的想法!?reactivejava

最新更新