Java中的命名管道和多线程



我是否正确?我假设在同一进程的范围内,有2个线程读/写一个命名管道根本不会阻塞读/写器?所以错误的时机可能会错过一些数据?

并且在多个进程的情况下-读取器将等待直到一些数据可用,并且写入器将被阻塞直到读取器读取读取器提供的所有数据?

我计划使用命名管道从外部进程传递几个(几十个,几百个)文件,并在我的Java应用程序中使用它们。编写简单的单元测试,使用一个线程向管道写入,另一个线程从管道读取,这会导致零星的测试失败,因为缺少数据块。

我认为这是因为线程和相同的过程,所以我的测试一般是不正确的。这个假设正确吗?

这里有一些例子来说明这种情况:

import java.io.{FileOutputStream, FileInputStream, File}
import java.util.concurrent.Executors
import org.apache.commons.io.IOUtils
import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.junit.JUnitRunner
@RunWith(classOf[JUnitRunner])
class PipeTest extends FlatSpec {
  def md5sum(data: Array[Byte]) = {
    import java.security.MessageDigest
    MessageDigest.getInstance("MD5").digest(data).map("%02x".format(_)).mkString
  }
  "Pipe" should "block here" in {
    val pipe = new File("/tmp/mypipe")
    val srcData = new File("/tmp/random.10m")
    val md5 = "8e0a24d1d47264919f9d47f5223c913e"
    val executor = Executors.newSingleThreadExecutor()
    executor.execute(new Runnable {
      def run() {
        (1 to 10).foreach {
          id =>
            val fis = new FileInputStream(pipe)
            assert(md5 === md5sum(IOUtils.toByteArray(fis)))
            fis.close()
        }
      }
    })
    (1 to 10).foreach {
      id =>
        val is = new FileInputStream(srcData)
        val os = new FileOutputStream(pipe)
        IOUtils.copyLarge(is, os)
        os.flush()
        os.close()
        is.close()
        Thread.sleep(200)
    }
  }
}

没有Thread.sleep(200)测试失败的原因

  • 管道破裂异常
  • MD5 sum错误

与这个延迟设置-它工作得很好。我使用的文件有10兆字节的随机数据

这是代码中一个非常简单的竞争条件:您正在向管道写入固定大小的消息,并假设您可以读取相同的消息。但是,对于任何给定的读取,您都不知道管道中有多少数据可用。

如果你在写操作前加上写的字节数,并确保每次读操作只读这个字节数,你会发现管道的工作方式和宣传的完全一样。

如果您有多个写入器和/或多个读取器的情况,我建议使用实际的消息队列。实际上,我建议在任何情况下使用消息队列,因为它解决了消息边界划分的问题;重新发明那个轮子没有什么意义。

我是否正确?我假设在同一进程的范围内,有2个线程读/写一个命名管道根本不会阻塞读/写器?

除非你正在使用非阻塞I/O,而你没有。

所以错误的时间可能会错过一些数据?

最新更新