仅读取一行时,命名管道内容将被丢弃



我可能误解了什么,但命名管道应该这样吗?

# consumer:
while True:
queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
with os.fdopen(queue, 'rb') as stream:
readers, _, _ = select([stream], [], [])
if readers:
reader = readers.pop()
contents: bytes = reader.readline().strip()
if b'quit' == contents:
break
print(contents)
# producer:
fd = os.open(pipe, os.O_WRONLY | os.O_NONBLOCK)
ps = open(fd, 'wb')
for i in range(10):
ps.write(str(i).encode())
ps.write(os.linesep.encode())
ps.close()

我可以看到所有数据都被写入管道,一旦文件关闭,使用者中的select就会拾取它并开始读取。。。这是输出:

b'0'

管道的其余部分都被丢弃了,就像它从未出现过一样。这是预期的行为吗?我的期望是打印:

b'0'
b'1'
...
b'9'

我想使用命名管道进行进程间通信。脚本A正在向独立脚本B发送一个命令,然后可能再发送三个。B应该拿起这些命令,一个接一个地执行。因此,上面的代码。然而,只有第一个被执行,其余的都不见了。命令并不像上面的例子那样出现。

  • cmd1+cmd2
  • 10秒后
  • cmd3
  • 5秒后
  • cmd4+cmd5+cmd6
  • 20秒后
  • 退出<-此时脚本B终止

我该如何做到这一点?

出于某种神奇的原因,select第一次挂起。

write(hello)
write(newline)
write(world)
flush()
print(hello) <- select hangs
... one eternity later ...
write(how)
write(newline)
write(are)
write(newline)
write(ya)
flush()
print(world) <- This should have been printed also without the new writes...
print(how) <- there were no new writes yet select didn't block as there was new data available for reading
print(are)
print(ya)

通过在select语句中添加超时值1,我不需要伪写来读取管道的其余部分。不确定这是否是一个选择限制,但肯定看起来很可疑,尤其是从第二次它按预期工作开始。

更新#1

问题是,使用者在每次选择之间都会关闭和重新打开管道。它只消耗了一行,在关闭时,管道中等待读取的所有其他数据都消失了。

因此,应将while True替换为fdopen。此外,您应该在使用者中正确处理EOF(这是第一个,额外的while True的来源(。这里是固定代码:

消费者

import os, sys
import select
while True:
print("Opening pipe")
queue: int = os.open('pipe', flags=os.O_RDONLY)
with os.fdopen(queue, 'rb') as stream:
while True:
readers, _, _ = select.select([stream], [], [])
if readers:
reader = readers.pop()
contents: bytes = reader.readline()
if contents is b'':
print("EOF detected")
break
contents: bytes = contents.strip()
if b'quit' == contents:
sys.exit(0)
print(contents)

生产者

生产者缺少刷新调用:

import os
fd = os.open('pipe', os.O_WRONLY)
ps = open(fd, 'wb')
for i in range(10):
ps.write(str(i).encode())
ps.write(os.linesep.encode())
ps.flush()
ps.close()

分析代码的很大帮助是在strace下运行consumer,它显示了所有系统调用,我可以注意到选择s之间关闭s。


陈旧过时的答案

行中:

reader = readers.pop()

您从管道中获得的内容不仅仅是第一个字节。但你们只打印出你们得到的第一个字节。

当你在消费者代码的末尾添加另一个从阅读器读取的代码时,请注意:

contents2: bytes = reader.readline().strip()
print(contents2)

您将获得:

b'0'
b'1'

当然,完整的代码应该测试它从管道中得到了多少。打印它,然后等待管道中出现更多数据。

这是我更新的消费者代码:

import os, sys
import select
while True:
queue: int = os.open('pipe', flags=os.O_RDONLY | os.O_NONBLOCK)
with os.fdopen(queue, 'rb') as stream:
readers, _, _ = select.select([stream], [], [])
if readers:
reader = readers.pop()
while True:
contents: bytes = reader.readline().strip()
if contents is b'':
break
if b'quit' == contents:
sys.exit(0)
print(contents)

最新更新