ZipOutputStream阻止Vertx事件循环



我正在处理一个场景,在这个场景中,我读取目录中的文件,然后创建一个zip文件。但这个操作有时会阻塞Vertx线程,我在Vertx跟踪中收到以下异常:

021-06-01 14:46:22.533 io.vertx.core.impl.BlockedThreadChecker [WARNING] Thread Thread[vert.x-eventloop-thread-4,5,main]=Thread[vert.x-eventloop-thread-4,5,main] has been blocked for 65088 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
at java.util.zip.Deflater.deflateBytes(Native Method)
at java.util.zip.Deflater.deflate(Deflater.java:444)
at java.util.zip.Deflater.deflate(Deflater.java:366)
at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:251)
at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
at java.util.zip.ZipOutputStream.write(ZipOutputStream.java:331)

以下是我创建zip文件的方法

private String zipDirectory(File dir, String zipDirName) {
_log.info("Entered zip file utility2");
String zipFilePath;
try(FileOutputStream fos = new FileOutputStream(zipDirName);
ZipOutputStream zos = new ZipOutputStream(fos);) {
populateFilesList(dir);

for(String filePath : _filesListInDir){
_log.info("FILES: "+filePath);
File file = new File(filePath);

if(!"zip".equals(Files.getFileExtension(file.getName()))) {
ZipEntry ze = new ZipEntry(filePath.substring(dir.getAbsolutePath().length()+1, filePath.length()));
zos.putNextEntry(ze);
FileInputStream fis = new FileInputStream(filePath);
byte[] buffer = new byte[1024];
int len;
while ((len = fis.read(buffer)) > 0) {
zos.write(buffer, 0, len);
}
zos.flush();
zos.closeEntry();
fis.close();

}else {
_log.info("Ignore zip for writing");
}

}
Path zipFilePathDir = Paths.get(zipDirName);
zipFilePath = zipFilePathDir.getFileName().toString();
_log.info("Zip file name: "+zipFilePath);
zos.close();
fos.close();
} catch (IOException e) {
zipFilePath = "FAILURE";
_log.error("Error creating zip file: "+e.getMessage());
}

return zipFilePath;
}

有人能提供任何建议吗?我如何确保我不会阻止Vertx 上的主事件循环

您可以使用Vertx.executeBlocking在Vert.x管理的工作池中运行该方法:

Future<String> fut = vertx.executeBlocking(promise -> promise.complete(zipDirectory(dir, zipDirName));

如果您的方法需要阻塞5或10秒以上,您可能还想创建自己的专用ThreadPool来执行该方法,Vert.x称之为WorkerExecutor,并使用WorkerExecutor.executeBlocking

// Create a WorkerExecutor with 1 thread, where each method call
// can run for 2 minutes before Vertx logs blocked thread warnings
WorkerExecutor we = vertx.createSharedExecutor("zip", 1, 2, TimeUnit.MINUTES)
Future<String> fut = we.executeBlocking(promise -> promise.complete(zipDirectory(dir, zipDirName));

最新更新