在 Java 中使用带有反应式包装器的非阻塞 IO 逐行读取文件



有没有办法使用内置的CompletableFuture或RxJava或Reactor等反应式流库逐行读取本地文件而不阻塞任何线程(后台线程池计为阻塞(

(

有趣的是,HTTP和不同的数据库(如Mongo,Redis等(有许多非阻塞IO库,但我无法找到任何简单的文件读取。

也有类似的问题:

  • 为什么 Java 中的 FileChannel 不是非阻塞的?
  • Java 中的非阻塞文件 IO
Java没有通用的非阻塞文件

IO的主要原因如下:Java是一种跨平台的语言,但Unix没有对文件的非阻塞访问。

如果您为 Windows 编程,则有一个特定于平台的实现 WindowsAsyncFileChannelImpl,它使用非阻塞机制。

虽然,我的问题从亚历山大那里得到了答案,但我想分享我发现的逐行读取文件的最被动的方法。两种解决方案都使用AsynchronousFileChannel,这并不总是非阻塞的,但仍然可以在反应式环境中使用,因为它使用专用线程池进行 IO 工作。

使用Spring Framework中的实用程序(WebFlux应用程序中的理想选择(

import org.springframework.core.codec.StringDecoder;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class AsyncFileRead {
    public Flux<String> lines() {
        StringDecoder stringDecoder = StringDecoder.textPlainOnly();
        return DataBufferUtils.readAsynchronousFileChannel(() -> AsynchronousFileChannel.open(Path.of("test/sample.txt"),
                StandardOpenOption.READ), DefaultDataBufferFactory.sharedInstance, 4096)
            .transform(dataBufferFlux -> stringDecoder.decode(dataBufferFlux, null, null, null));
    }
}

使用 RxIo 库

import org.javaync.io.AsyncFiles;
import reactor.core.publisher.Flux;
import java.nio.file.Path;
public class AsyncFileRead {
    public Flux<String> lines() {
        return Flux.from(AsyncFiles.lines(Path.of("test/sample.txt")));
    }
}

最新更新