与PipedInputStream和PipedOutputStream的线程同步



我有一个问题,当试图实现线程同步与PipedInputStream和PipedOutputStream在Java中。

有三个线程T1, T2, T3可以并发编辑toto.txt文件。toto.txt的文件内容类似于:

T1 : 1 T2 : 1 T3 : 1 T1 : 2 T2 : 2 T3 : 2 T1 : 3 T2 : 3 T3 : 3 ....

我的想法是:每个线程可以访问toto.txt只有当它有一个关键变量key = true。在编辑完文件后,线程A将关键字内容写入连接到PipedOutputStream的pipedInputStream。线程B从PipedOutStream读取key,如果key = true, B可以访问编辑文件。有一个起始线程可以写入文件,另一个线程先等待键->写入文件->写入键到管道。若有3根螺纹,则有3根管子连接:T1-T2、T2-T3、T3-T1。

我的代码线程

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedWriter;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
public class threadFlux implements Runnable {
    public String _threadName;
    public boolean _key;
    public boolean _stratingThread;
    public int _count;
    public int _maxCount;
    public String _fileName;
    public DataInputStream _is;
    public DataOutputStream _os;
    public threadFlux(String threadName, String fileName, boolean starting, int maxCount) {
        this._threadName = threadName; 
        this._maxCount = maxCount;
        this._count = 1;
        this._fileName = fileName;
        this._stratingThread = starting;
        this._key = (starting == true);
    }
    @Override
    public void run() {
        while (this._count <= this._maxCount) {
            if (this._stratingThread == true) {
                try {
                    /* starting thread write to file */
                    System.out.println("startint thread");
                    System.out.println(this._threadName + ": " + this._count);
                    this.writeToFile(this._threadName + ": " + this._count + "n");
                    this._count++;
                    /* write key to pipe */
                    this.writeKeyToPipe(this._key);
                    System.out.println("key written");
                    /* set key = false */
                    this._key = false;
                    this._stratingThread = false;
                } catch (IOException ex) {
                    Logger.getLogger(threadFlux.class.getName()).log(Level.SEVERE, null, ex);
                }
            } else {
                try {
                    /* read key from pipe */
                    System.out.println(this._threadName + " Clef " + this._key);
                    this._key = this.readKeyFromPipe();
                    System.out.println(this._threadName + " Clef " + this._key);
                    /* write key to pipe */
                    System.out.println(this._threadName + ": " + this._count);
                    this.writeToFile(this._threadName + ": " + this._count + "n");
                    this._count++;
                    /* write key to pipe for another thread */
                    this.writeKeyToPipe(this._key);
                    this._key = false;
                } catch (IOException ex) {
                    Logger.getLogger(threadFlux.class.getName()).log(Level.SEVERE, null, ex);
                }
            }
        }
        System.out.println(this._threadName + " finish!");
    }
    public void setPipedStream(PipedOutputStream pos, PipedInputStream pis) throws IOException {
        this._os = new DataOutputStream(new BufferedOutputStream(pos));
        this._is = new DataInputStream(new BufferedInputStream(pis));
    }
    private void writeToFile(String string) throws IOException {
        File file = new File(this._fileName);
        //if file doesnt exists, then create it
        if (!file.exists()) {
            file.createNewFile();
        }
        //true = append file
        FileWriter fileWritter = new FileWriter(file.getName(), true);
        try (BufferedWriter bufferWritter = new BufferedWriter(fileWritter)) {
            bufferWritter.write(string);
            bufferWritter.close();
        }
    }
    private void writeKeyToPipe(boolean _key) throws IOException {
        this._os.writeBoolean(_key);      
    }
    private boolean readKeyFromPipe() throws IOException {
        return this._is.readBoolean();
    }
}

主程序

import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Test {
    public static void main(String[] args) {
        try {
            // TODO code application logic here
            threadFlux runnableThread1 = new threadFlux("T1", "toto.txt", true, 3);
            threadFlux runnableThread2 = new threadFlux("T2", "toto.txt", false, 3);
            threadFlux runnableThread3 = new threadFlux("T3", "toto.txt", false, 3);
            PipedOutputStream pos1 = new PipedOutputStream();           
            PipedOutputStream pos2 = new PipedOutputStream();
            PipedOutputStream pos3 = new PipedOutputStream();
            PipedInputStream pis2 = new PipedInputStream(pos1);
            PipedInputStream pis1 = new PipedInputStream(pos3);
            PipedInputStream pis3 = new PipedInputStream(pos2);
            runnableThread1.setPipedStream(pos1, pis1);
            runnableThread2.setPipedStream(pos2, pis2);
            runnableThread3.setPipedStream(pos3, pis3);
            Thread thread1 = new Thread(runnableThread1);
            Thread thread2 = new Thread(runnableThread2);
            Thread thread3 = new Thread(runnableThread3);
            thread1.start();
            thread2.start();
            thread3.start();
        } catch (IOException ex) {
            Logger.getLogger(Test.class.getName()).log(Level.SEVERE, null, ex);
        } finally {        
        }
    }
}

问题,当我运行这些代码:它被阻塞后,启动线程写入文件,并写入关键到PipedOutputStream。

谢谢你的帮助

PipedOutputStream有一个固定的缓冲区,上次我看了4k。当它被填满时,它会阻塞,直到读线程读到一些东西。所以你的读线程没有在读取。

不要这样做。线程之间的I/O管道基本上是不必要的。你不需要像这样移动数据。

相关内容

  • 没有找到相关文章

最新更新