我可以使FileInputStream.read阻塞,直到同一文件上的FileOutputStream关闭吗?



在我的应用程序中,我正在接收要存储在文件中的数据,并用它进行一些计算。接收和计算都可能持续很长时间,所以我想异步进行。 下面的列表显示了我的基本设置:thread1生成一些数据并将它们存储在文件中。thread2读取文件并处理数据。

Thread thread1 = new Thread( () -> {
try {
BufferedOutputStream out = new BufferedOutputStream( new FileOutputStream( "test" ) );
for( int i = 0; i < 10; i++ ) {
//producing data...
out.write( ( "hello " + i + "n" ).getBytes() );
out.flush();
//Thread.sleep( 10 );
}
out.close();
} catch( Exception e ) {
e.printStackTrace();
}
} );
thread1.start();
Thread thread2 = new Thread( () -> {
try {
BufferedInputStream in = new BufferedInputStream( new FileInputStream( "test" ) );
int b = in.read();
while( b != -1 ) {
//do some calculation with data
System.out.print( (char)b );
b = in.read();
}
in.close();
} catch( Exception e ) {
e.printStackTrace();
}
} );
thread2.start();

我想根据这个问题同时读取和写入同一个文件是可以的: FileInputStream 和 FileOutputStream 到同一个文件: read(( 是否保证看到"以前发生过"的所有 write((? 还是我在这里错过了什么?

执行上面的清单会按预期生成输出:

hello 0
hello 1
hello 2
hello 3
hello 4
hello 5
hello 6
hello 7
hello 8
hello 9

但是,如果由于某种原因读取器线程比编写器快(可以通过取消注释 thread1 中的 Thread.sleep 行来模拟(,则读取器会读取EOF(-1( 并在文件完全写入之前完成。只放出一行:

hello 0

但是,编写器仍然在"test"文件中生成整个输出。

现在我想做in.read()块,直到线程 1 中的文件输出流关闭。 我认为这可以通过避免将EOF放在文件末尾直到关闭out来完成。这是真的吗,如果是,我该怎么做?还是有更好的方法?

读取器(使用者(可以等待编写器(生产者(,即使接口是文件。但总的来说,最好使用队列并遵循生产者/消费者模式。

无论如何,在这种情况下,粗略的"等待更多输入"过程仅涉及两个Atomic值:

  • 一个用于跟踪写入的字节数(AtomicInteger(
  • 一个表示没有更多字节可用 (AtomicBoolean(

原子变量可以在线程之间共享:两个线程将始终看到原子值的最新值。 然后,编写器可以更新通过AtomicInteger写入的字节数,然后读取器可以决定等待更多输入。 编写器还可以指示是否不再通过AtomicBoolean写入字节,读取器可以使用该信息读取到文件末尾。

要记住的另一件事是,启动线程不在您的控制范围内:您的操作系统将确定线程何时实际开始运行。 要使线程有合理的机会同时运行,请使用"startLatch",如下面的代码所示。

下面的演示代码是可运行的,应该可以很好地说明如何使读取器线程等待来自编写器线程的更多输入。


import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class ReadWhileWrite {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
try {
CountDownLatch startLatch = new CountDownLatch(2);
Path testFile = Paths.get("test-read-while-write.txt");
testFile.toFile().delete();
int fakeSlowWriteMs = 100; // waiting time in milliseconds between writes. 
CountDownLatch testFileExists = new CountDownLatch(1);
AtomicInteger bytesWritten = new AtomicInteger();
AtomicBoolean writeFinished = new AtomicBoolean();
// Writer
executor.execute(() -> {
try {
// Make sure reader and writer start at the same time
startLatch.countDown();
if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Bogus reader start.");
}
try (OutputStream out = Files.newOutputStream(testFile)) {
testFileExists.countDown();
int maxLoops = 10;
IntStream.range(0, maxLoops).forEach(i -> {
byte[] msg = ("hello " + i + "n").getBytes(StandardCharsets.UTF_8);
try {
out.write(msg);
out.flush();
bytesWritten.addAndGet(msg.length);
} catch (IOException e) {
e.printStackTrace();
}
if (fakeSlowWriteMs > 0 && i < maxLoops - 1) {
try {
Thread.sleep(fakeSlowWriteMs);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
} catch (Exception e) {
e.printStackTrace();
}
writeFinished.set(true);
});
// Reader
CountDownLatch doneLatch = new CountDownLatch(1);
executor.execute(() -> {
try {
// Make sure reader and writer start at the same time
startLatch.countDown();
if (!startLatch.await(1000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Bogus writer start.");
}
int bytesRead = 0;
int bytesRequired = 1; // Number of bytes read from file in one go.
int maxWaitTimeMs = 1000;
if (!testFileExists.await(500L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Writer did not open file for reading within 500 ms.");
}
try (InputStream in = Files.newInputStream(testFile)) {
boolean eof = false;
while (!eof) {
if (!writeFinished.get()) {
if (bytesWritten.get() - bytesRead < bytesRequired) {
int sleepTimeTotal = 0;
while (!writeFinished.get()) {
Thread.sleep(1);
if (bytesWritten.get() - bytesRead >= bytesRequired) {
break; // break the waiting loop, read the available bytes.
}
sleepTimeTotal += 1;
if (sleepTimeTotal >= maxWaitTimeMs) {
throw new RuntimeException("No bytes available to read within waiting time.");
}
}
}
}
int b = in.read();
bytesRead += 1;
if (b < 0) {
eof = true;
} else {
System.out.print( (char) b);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
doneLatch.countDown();
});
if (!doneLatch.await(3000L, TimeUnit.MILLISECONDS)) {
throw new RuntimeException("Reader and writer did not finish within 3 seconds.");
}
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdownNow();
System.out.println("nFinished.");
}
}

最新更新