从命名管道连续读取



我想知道我还有什么其他选择,以便使用 golang 从命名管道连续读取。我当前的代码依赖于在 gorutine 内运行的无限 for 循环;但 Hat 将一个 CPU 保持在 100% 使用率。

func main() {
....
var wg sync.WaitGroup
fpipe, _ := os.OpenFile(namedPipe, os.O_RDONLY, 0600)
defer fpipe.Close()
f, _ := os.Create("dump.txt")
defer f.Close()
var buff bytes.Buffer
wg.Add(1)
go func() {
for {
io.Copy(&buff, fpipe)
if buff.Len() > 0 {
buff.WriteTo(f)
}
}
}()
wg.Wait()
}

简介

如前所述,如果没有剩余的编写器,命名管道读取器将收到 EOF。

但是,我发现@JimB的解决方案不是最佳的:

  1. 命名管道具有最大容量(65kB,iirc(,很可能在100毫秒的睡眠期内被填满。当缓冲区被填满时,所有写入器都会无缘无故地阻塞。
  2. 如果重新启动,您将平均丢失 50 毫秒的数据。再次,没有充分的理由。
  3. 如果您想使用静态缓冲区进行复制,恕我直言,io.CopyBuffer(dst Writer, src Reader, buf []byte) (written int64, err error)将是更好的解决方案。但这甚至不是必需的,因为io.Copy(或底层实现(实际上分配了 32kB 的缓冲区。

我的方法

更好的解决方案是等待写入发生,并立即将命名管道的内容复制到目标文件。在大多数系统上,文件系统事件都有某种通知。包 github.com/rjeczalik/notify 可用于访问我们感兴趣的事件,因为写入事件在大多数重要的操作系统上跨平台工作。对我们来说有趣的另一个事件是删除命名管道,因为我们没有任何可读的内容。

因此,我的解决方案是:

package main
import (
"flag"
"io"
"log"
"os"
"github.com/rjeczalik/notify"
)
const (
MAX_CONCURRENT_WRITERS = 5
)
var (
pipePath string
filePath string
)
func init() {
flag.StringVar(&pipePath, "pipe", "", "/path/to/named_pipe to read from")
flag.StringVar(&filePath, "file", "out.txt", "/path/to/output file")
log.SetOutput(os.Stderr)
}
func main() {
flag.Parse()
var p, f *os.File
var err error
var e notify.EventInfo
// The usual stuff: checking wether the named pipe exists etc
if p, err = os.Open(pipePath); os.IsNotExist(err) {
log.Fatalf("Named pipe '%s' does not exist", pipePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to read named pipe '%s': %s", pipePath, err)
} else if err != nil {
log.Fatalf("Error while opening named pipe '%s': %s", pipePath, err)
}
// Yep, there and readable. Close the file handle on exit
defer p.Close()
// Do the same for the output file
if f, err = os.OpenFile(filePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600); os.IsNotExist(err) {
log.Fatalf("File '%s' does not exist", filePath)
} else if os.IsPermission(err) {
log.Fatalf("Insufficient permissions to open/create file '%s' for appending: %s", filePath, err)
} else if err != nil {
log.Fatalf("Error while opening file '%s' for writing: %err", filePath, err)
}
// Again, close the filehandle on exit
defer f.Close()
// Here is where it happens. We create a buffered channel for events which might happen
// on the file. The reason why we make it buffered to the number of expected concurrent writers
// is that if all writers would (theoretically) write at once or at least pretty close
// to each other, it might happen that we loose event. This is due to the underlying implementations
// not because of go.
c := make(chan notify.EventInfo, MAX_CONCURRENT_WRITERS)
// Here we tell notify to watch out named pipe for events, Write and Remove events
// specifically. We watch for remove events, too, as this removes the file handle we
// read from, making reads impossible
notify.Watch(pipePath, c, notify.Write|notify.Remove)
// We start an infinite loop...
for {
// ...waiting for an event to be passed.
e = <-c
switch e.Event() {
case notify.Write:
// If it a a write event, we copy the content of the named pipe to
// our output file and wait for the next event to happen.
// Note that this is idempotent: Even if we have huge writes by multiple
// writers on the named pipe, the first call to Copy will copy the contents.
// The time to copy that data may well be longer than it takes to generate the events.
// However, subsequent calls may copy nothing, but that does not do any harm.
io.Copy(f, p)
case notify.Remove:
// Some user or process has removed the named pipe,
// so we have nothing left to read from.
// We should inform the user and quit.
log.Fatalf("Named pipe '%s' was removed. Quitting", pipePath)
}
}
}

当没有剩余的编写器时,命名管道读取器将收到EOF。此代码之外的解决方案是确保始终有一个编写器进程保存文件描述符,尽管它不需要写入任何内容。

在 Go 程序中,如果您想等待新的写入器,则必须在 for 循环中轮询io.Reader。您当前的代码通过繁忙循环执行此操作,这将消耗 100% 的 CPU 内核。添加睡眠和返回其他错误的方法将解决此问题:

for {
err := io.Copy(&buff, fpipe)
if buff.Len() > 0 {
buff.WriteTo(f)
}
if err != nil {
// something other than EOF happened
return
}
time.Sleep(100 * time.Millisecond)
}

问题:当"最后一个写入器"关闭管道时,即使稍后可能会出现新的写入器,您也会得到一个 EOF。

解决方案:自己打开管道进行写入,不要关闭它。现在,您可以将读取端视为永无止境的读取,而无需获得EOF。将以下内容直接放在打开管道进行阅读的位置之后:

nullWriter, err := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
if err != nil {
logger.Crit("Error opening pipe for (placeholder) write", "err", err)
}
defer nullWriter.Close()

最新更新