我有一个问题,当试图实现线程同步与PipedInputStream和PipedOutputStream在Java中。
有三个线程T1, T2, T3可以并发编辑toto.txt文件。toto.txt的文件内容类似于:
T1 : 1 T2 : 1 T3 : 1 T1 : 2 T2 : 2 T3 : 2 T1 : 3 T2 : 3 T3 : 3 ....
我的想法是:每个线程可以访问toto.txt只有当它有一个关键变量key = true。在编辑完文件后,线程A将关键字内容写入连接到PipedOutputStream的pipedInputStream。线程B从PipedOutStream读取key,如果key = true, B可以访问编辑文件。有一个起始线程可以写入文件,另一个线程先等待键->写入文件->写入键到管道。若有3根螺纹,则有3根管子连接:T1-T2、T2-T3、T3-T1。
我的代码线程
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
public class threadFlux implements Runnable {
public String _threadName;
public boolean _key;
public boolean _stratingThread;
public int _count;
public int _maxCount;
public String _fileName;
public DataInputStream _is;
public DataOutputStream _os;
public threadFlux(String threadName, String fileName, boolean starting, int maxCount) {
this._threadName = threadName;
this._maxCount = maxCount;
this._count = 1;
this._fileName = fileName;
this._stratingThread = starting;
this._key = (starting == true);
}
@Override
public void run() {
while (this._count <= this._maxCount) {
if (this._stratingThread == true) {
try {
/* starting thread write to file */
System.out.println("startint thread");
System.out.println(this._threadName + ": " + this._count);
this.writeToFile(this._threadName + ": " + this._count + "n");
this._count++;
/* write key to pipe */
this.writeKeyToPipe(this._key);
System.out.println("key written");
/* set key = false */
this._key = false;
this._stratingThread = false;
} catch (IOException ex) {
Logger.getLogger(threadFlux.class.getName()).log(Level.SEVERE, null, ex);
}
} else {
try {
/* read key from pipe */
System.out.println(this._threadName + " Clef " + this._key);
this._key = this.readKeyFromPipe();
System.out.println(this._threadName + " Clef " + this._key);
/* write key to pipe */
System.out.println(this._threadName + ": " + this._count);
this.writeToFile(this._threadName + ": " + this._count + "n");
this._count++;
/* write key to pipe for another thread */
this.writeKeyToPipe(this._key);
this._key = false;
} catch (IOException ex) {
Logger.getLogger(threadFlux.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
System.out.println(this._threadName + " finish!");
}
public void setPipedStream(PipedOutputStream pos, PipedInputStream pis) throws IOException {
this._os = new DataOutputStream(new BufferedOutputStream(pos));
this._is = new DataInputStream(new BufferedInputStream(pis));
}
private void writeToFile(String string) throws IOException {
File file = new File(this._fileName);
//if file doesnt exists, then create it
if (!file.exists()) {
file.createNewFile();
}
//true = append file
FileWriter fileWritter = new FileWriter(file.getName(), true);
try (BufferedWriter bufferWritter = new BufferedWriter(fileWritter)) {
bufferWritter.write(string);
bufferWritter.close();
}
}
private void writeKeyToPipe(boolean _key) throws IOException {
this._os.writeBoolean(_key);
}
private boolean readKeyFromPipe() throws IOException {
return this._is.readBoolean();
}
}
主程序
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Test {
public static void main(String[] args) {
try {
// TODO code application logic here
threadFlux runnableThread1 = new threadFlux("T1", "toto.txt", true, 3);
threadFlux runnableThread2 = new threadFlux("T2", "toto.txt", false, 3);
threadFlux runnableThread3 = new threadFlux("T3", "toto.txt", false, 3);
PipedOutputStream pos1 = new PipedOutputStream();
PipedOutputStream pos2 = new PipedOutputStream();
PipedOutputStream pos3 = new PipedOutputStream();
PipedInputStream pis2 = new PipedInputStream(pos1);
PipedInputStream pis1 = new PipedInputStream(pos3);
PipedInputStream pis3 = new PipedInputStream(pos2);
runnableThread1.setPipedStream(pos1, pis1);
runnableThread2.setPipedStream(pos2, pis2);
runnableThread3.setPipedStream(pos3, pis3);
Thread thread1 = new Thread(runnableThread1);
Thread thread2 = new Thread(runnableThread2);
Thread thread3 = new Thread(runnableThread3);
thread1.start();
thread2.start();
thread3.start();
} catch (IOException ex) {
Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex);
} finally {
}
}
}
问题,当我运行这些代码:它被阻塞后,启动线程写入文件,并写入关键到PipedOutputStream。
谢谢你的帮助
PipedOutputStream有一个固定的缓冲区,上次我看了4k。当它被填满时,它会阻塞,直到读线程读到一些东西。所以你的读线程没有在读取。
不要这样做。线程之间的I/O管道基本上是不必要的。你不需要像这样移动数据。