
我想知道我还有什么其他选择,以便使用 golang 从命名管道连续读取。我当前的代码依赖于在 gorutine 内运行的无限 for 循环;但 Hat 将一个 CPU 保持在 100% 使用率。

func main() {
var wg sync.WaitGroup
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
defer fpipe.Close()
f, _ := os.Create("dump.txt")
defer f.Close()
var buff bytes.Buffer
go func() {
for {
io.Copy(&buff, fpipe)
if buff.Len() > 0 {


如前所述,如果没有剩余的编写器,命名管道读取器将收到 EOF。


  1. 命名管道具有最大容量(65kB,iirc(,很可能在100毫秒的睡眠期内被填满。当缓冲区被填满时,所有写入器都会无缘无故地阻塞。
  2. 如果重新启动,您将平均丢失 50 毫秒的数据。再次,没有充分的理由。
  3. 如果您想使用静态缓冲区进行复制,恕我直言,io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)将是更好的解决方案。但这甚至不是必需的,因为io.Copy(或底层实现(实际上分配了 32kB 的缓冲区。


更好的解决方案是等待写入发生,并立即将命名管道的内容复制到目标文件。在大多数系统上,文件系统事件都有某种通知。包 github.com/rjeczalik/notify 可用于访问我们感兴趣的事件,因为写入事件在大多数重要的操作系统上跨平台工作。对我们来说有趣的另一个事件是删除命名管道,因为我们没有任何可读的内容。


package main
import (
const (
var (
pipePath string
filePath string
func init() {
flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
func main() {
var p, f *os.File
var err error
var e notify.EventInfo
// The usual stuff: checking wether the named pipe exists etc
if p, err = os.Open(pipePath); os.IsNotExist(err) {
log.Fatalf("Named pipe '%s' does not exist", pipePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
} else if err != nil {
log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
// Yep, there and readable. Close the file handle on exit
defer p.Close()
// Do the same for the output file
if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
log.Fatalf("File '%s' does not exist", filePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
} else if err != nil {
log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
// Again, close the filehandle on exit
defer f.Close()
// Here is where it happens. We create a buffered channel for events which might happen
// on the file. The reason why we make it buffered to the number of expected concurrent writers
// is that if all writers would (theoretically) write at once or at least pretty close
// to each other, it might happen that we loose event. This is due to the underlying implementations
// not because of go.
c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)
// Here we tell notify to watch out named pipe for events, Write and Remove events
// specifically. We watch for remove events, too, as this removes the file handle we
// read from, making reads impossible
notify.Watch(pipePath, c, notify.Write|notify.Remove)
// We start an infinite loop...
for {
// ...waiting for an event to be passed.
e = <-c
switch e.Event() {
case notify.Write:
// If it a a write event, we copy the content of the named pipe to
// our output file and wait for the next event to happen.
// Note that this is idempotent: Even if we have huge writes by multiple
// writers on the named pipe, the first call to Copy will copy the contents.
// The time to copy that data may well be longer than it takes to generate the events.
// However, subsequent calls may copy nothing, but that does not do any harm.
io.Copy(f, p)
case notify.Remove:
// Some user or process has removed the named pipe,
// so we have nothing left to read from.
// We should inform the user and quit.
log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)


在 Go 程序中,如果您想等待新的写入器,则必须在 for 循环中轮询io.Reader。您当前的代码通过繁忙循环执行此操作,这将消耗 100% 的 CPU 内核。添加睡眠和返回其他错误的方法将解决此问题:

for {
err := io.Copy(&buff, fpipe)
if buff.Len() > 0 {
if err != nil {
// something other than EOF happened
time.Sleep(100 * time.Millisecond)

问题:当"最后一个写入器"关闭管道时,即使稍后可能会出现新的写入器,您也会得到一个 EOF。


nullWriter, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
logger.Crit("Error opening pipe for (placeholder) write", "err", err)
defer nullWriter.Close()
