关闭管道中间的流



当我执行这段在流管道期间打开大量文件的代码时:

public static void main(String[] args) throws IOException {
    Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
            100, (path, attr) -> path.toString().endsWith(".html"))
        .map(file -> runtimizeException(() -> Files.lines(file, StandardCharsets.ISO_8859_1)))
        .map(Stream::count)
        .forEachOrdered(System.out::println);
}

我得到一个例外:

java.nio.file.FileSystemException: /long/file/name: Too many open files

问题是Stream.count在完成遍历流时不会关闭流。但我不明白为什么它不应该,因为它是一个终端操作。这同样适用于其他终端操作,例如 reduceforEach . 另一方面,flatMap关闭它所包含的流。

文档告诉我在必要时使用 try-with-resouces-语句来关闭流。就我而言,我可以将count行替换为以下内容:

.map(s -> { long c = s.count(); s.close(); return c; } )

但这既嘈杂又丑陋,在某些情况下,对于大型复杂的管道来说,可能会带来真正的不便。

所以我的问题如下:

  1. 为什么流的设计没有使终端操作关闭他们正在处理的流?这将使它们更好地与 IO 流配合使用。
  2. 关闭管道中的 IO 流的最佳解决方案是什么?

runtimizeException 是一种将检查异常包装在 RuntimeException s 中的方法。

这里有两个问题:处理检查的异常(如 IOException)和及时关闭资源。

预定义的功能接口都没有声明任何已检查的异常,这意味着它们必须在 lambda 中处理,或者包装在未经检查的异常中并重新抛出。看起来您的runtimizeException函数就是这样做的。您可能还必须为其声明自己的功能接口。正如您可能已经发现的那样,这是一种痛苦。

在关闭文件等资源时,有一些调查是在到达流的末尾时自动关闭流。这很方便,但它不处理引发异常时的关闭。在流中没有神奇的做正确的事情机制。

我们只剩下处理资源闭包的标准 Java 技术,即 Java 7 中引入的 try-with-resources 构造。TWR 确实希望资源在调用堆栈中的同一级别关闭,因为它们被打开时。"谁打开它就必须关闭它"的原则适用。TWR还处理异常处理,这通常可以方便地在同一位置处理异常处理和资源关闭。

在此示例中,流有些不寻常,因为它将Stream<Path>映射到Stream<Stream<String>>。这些嵌套流是未关闭的流,当系统用完打开的文件描述符时,会导致最终异常。使这变得困难的是,文件由一个流操作打开,然后传递到下游;这使得无法使用TWR。

构建此管道的另一种方法如下。

Files.lines调用是打开文件的调用,因此这必须是 TWR 语句中的资源。这个文件的处理是(一些)IOExceptions抛出的地方,所以我们可以在同一个 TWR 语句中进行异常包装。这表明使用一个简单的函数将路径映射到行计数,同时处理资源关闭和异常包装:

long lineCount(Path path) {
    try (Stream<String> s = Files.lines(path, StandardCharsets.ISO_8859_1)) {
        return s.count();
    } catch (IOException ioe) {
        throw new UncheckedIOException(ioe);
    }
}

有了这个帮助程序函数后,主管道如下所示:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
           100, (path, attr) -> path.toString().endsWith(".html"))
     .mapToLong(this::lineCount)
     .forEachOrdered(System.out::println);

可以创建一个实用程序方法,可靠地关闭管道中间的流。

这可确保每个资源都使用 try-with-resource-语句关闭,但避免了对自定义实用程序方法的需求,并且比直接在 lambda 中编写 try-语句要简单得多。

使用此方法,问题中的管道如下所示:

Files.find(Paths.get("Java_8_API_docs/docs/api"), 100,
        (path, attr) -> path.toString().endsWith(".html"))
    .map(file -> applyAndClose(
        () -> Files.lines(file, StandardCharsets.ISO_8859_1),
        Stream::count))
    .forEachOrdered(System.out::println);

实现如下所示:

/**
 * Applies a function to a resource and closes it afterwards.
 * @param sup Supplier of the resource that should be closed
 * @param op operation that should be performed on the resource before it is closed
 * @return The result of calling op.apply on the resource 
 */
private static <A extends AutoCloseable, B> B applyAndClose(Callable<A> sup, Function<A, B> op) {
    try (A res = sup.call()) {
        return op.apply(res);
    } catch (RuntimeException exc) {
        throw exc;
    } catch (Exception exc) {
        throw new RuntimeException("Wrapped in applyAndClose", exc);
    }
}

(由于需要关闭的资源在分配非运行时异常时通常也会引发异常,因此不需要单独的方法来执行此操作。

您需要

在此流操作中调用close(),这将导致调用所有基础关闭处理程序。

更好的是,将整个语句包装在一个 try-with-resources 块中,因为这样它将自动调用 close 处理程序。

这在您的情况下可能是不可能的,这意味着您需要在某些操作中自己处理它。您当前的方法可能根本不适合流。

看来您确实需要在第二次map()手术中执行此操作。

接口 AutoCloseable关闭只应调用一次。有关详细信息,请参阅自动关闭的文档。

如果最终操作会自动关闭流,则可能会调用两次关闭。请看以下示例:

try (Stream<String> lines = Files.lines(path)) {
    lines.count();
}

正如现在所定义的那样,上的 close 方法将只调用一次。无论最终操作是正常完成,还是在 IOException 中中止操作。如果在最终操作中隐式关闭流,则在发生 IOException 时调用 close 方法一次,如果操作成功完成,则调用两次close 方法。

这是一个替代方法,它使用 Files 中的另一种方法,可以避免泄漏文件描述符:

Files.find(Paths.get("JAVA_DOCS_DIR/docs/api/"),
    100, (path, attr) -> path.toString().endsWith(".html"))
    .map(file -> runtimizeException(() -> Files.readAllLines(file, StandardCharsets.ISO_8859_1).size())
    .forEachOrdered(System.out::println);

与您的版本不同,它将返回行数的int而不是long;但是您没有那么多行的文件,是吗?

最新更新