无法从外部进程读取 PTY(伪终端文件)



我正在打开一个PTY(在Python/Linux中(并对其进行写入。我可以通过minicom完美地读取它。但是,我无法在另一个Python(或C++(程序中阅读它。下面是一个最小化的例子:

producer.py(打开pty/写入(:

import os, sys
from time import sleep
master_fd, slave_fd = os.openpty()
print "minicom -D %s" % os.ttyname( slave_fd )
for i in range(0,30): 
    d = str(i % 10)
    os.write( master_fd, d )
    sys.stdout.write( d )
    sys.stdout.flush()
    sleep( 2 )
os.close( slave_fd )
os.close( master_fd )
print "nDone"    

consumer.py(尝试打开/读取(:

import os, sys
from time import sleep
pts=raw_input("Enter pts number:")
while True:
    fd=0
    try:
        fd=os.open('/dev/pts/%d' % (pts,), 
            os.O_RDONLY | os.O_NONBLOCK )
        sys.stdout.write( os.read(fd, 1 ) )  
        sys.stdout.flush()       
    except Exception as e: print e
    if fd: os.close(fd)    
    sleep(1)        

读取的结果总是:

[Erno 11]资源暂时不可用

如果我在阻塞模式下读取,它只会阻塞,直到生产者终止。然后,它说该文件不存在。

我花了好几天的时间试图设置各种模式、权限、锁等,但似乎没有什么能让我取得进展。这种东西很容易与常规文件配合使用。此外,请再次注意minicom可以顺利读取pty。此外,使用lsof,我可以看到minicom和我的consumer.py脚本确实打开了文件——在python示例中,读取操作不起作用。那么,微型通信的秘密是什么呢?我试着在minicom源代码中找到这样的东西,但我没有成功找到任何可以使用的东西。

理想情况下,我的生产者会让它更容易打开和阅读(就像在我的消费者示例中一样(,但如果我能看到这项工作,我愿意修改任何一端。。。

是什么让您认为无法打开PTY?您的代码中没有提供有关哪个系统调用失败的信息。

最有可能的情况是os.read()调用失败,错误代码为EAGAIN(也称为EWOULDBLOCK(,因为您在非阻塞模式下打开了PTY。没有数据可读取,因为PTY是tty,并且tty最初处于"熟"模式,这意味着在发送行尾字符或某个中断字符之前,不会将任何输入传递给使用者。Minicom可能会通过termios调用将pty置于"原始"模式,您也应该这样做。

我想你并不是真的想把PTY设置为非阻塞模式。除非您设置了事件轮询或选择循环,否则您将重复获得EAGAIN"错误"(这些错误并不是真正的错误(,并且您真的不想等待整整一秒钟再重试。(您也不想关闭并重新打开PTY。(最好将PTY保持在阻塞模式,但将其配置为在每次击键时立即返回(再次使用termios(。

我的主要挂断是在pty设置中。请参阅我在@rici回答下的评论。

修订生产商.py:

import os, sys
from time import sleep
import fcntl
import termios 
# create a new Psdeuo Terminal (pty), with a dynamically 
# assigned path to it, and display the minicom command to 
# open it as a test consumer
master_fd, slave_fd = os.openpty()
print "minicom -D %s" % os.ttyname( slave_fd )
# termios attribute index constants
iflag  = 0
oflag  = 1
cflag  = 2
lflag  = 3
ispeed = 4
ospeed = 5
cc     = 6
# get current pty attributes
termAttr = termios.tcgetattr( master_fd )
# disable canonical and echo modes       
termAttr[lflag] &= ~termios.ICANON & ~termios.ECHO
# disable interrupt, quit, and suspend character processing 
termAttr[cc][termios.VINTR] = 'x00' 
termAttr[cc][termios.VQUIT] = 'x00'
termAttr[cc][termios.VSUSP] = 'x00'
# set revised pty attributes immeaditely
termios.tcsetattr( master_fd, termios.TCSANOW, termAttr )
# enable non-blocking mode on the file descriptor
flags = fcntl.fcntl( master_fd, fcntl.F_GETFL ) 
flags |= os.O_NONBLOCK               
fcntl.fcntl( master_fd, fcntl.F_SETFL, flags )
# write some example data for a couple of minutes
for i in range(0,60): 
    d = str(i % 10)
    os.write( master_fd, d )
    sys.stdout.write( d )
    sys.stdout.flush()
    sleep( 2 )
# release the resources     
os.close( slave_fd )
os.close( master_fd )
print "nDone"

修改后的consumer.py:

import os, sys
from time import sleep
from errno import EAGAIN, EBUSY
ERRORS_TO_IGNORE = [EAGAIN, EBUSY]
# the PTS will be dynamically assigned to the producer,
# so the consumer needs to have that supplied
pts=raw_input("Enter pts number:")
fd=None
print "Press Ctrl+Z to exit"
while True:
    sleep(1)      
    # if the pty is not open, attempt to open it
    # in readonly, non-blocking mode
    try:
        if not fd:
            fd=os.open('/dev/pts/%s' % (pts,), 
                       os.O_RDONLY | os.O_NONBLOCK )
    except Exception as e:
        print e
        if fd: fd = os.close(fd)     
        continue         
    # attempt to read/pop a character from the stream
    # and then display it in this terminal                 
    try:        
        c = os.read( fd, 1 )
        sys.stdout.write( str(c) )  
        sys.stdout.flush()
    except Exception as e:
        # ignore some "normal" / "race condition" errors
        if( isinstance(e, OSError) and
            e.errno in ERRORS_TO_IGNORE ):pass
        else : print e

最新更新