如果我不使用子进程,如何使用python读取stdrr?



我正在使用cwiid库,这是一个用C编写的库,但用python使用。该库允许我使用 Wiimote 来控制机器人上的一些电机。该代码在没有显示器、键盘或鼠标的嵌入式设备上作为守护程序运行。

当我尝试初始化对象时:

import cwiid
while True:
try:
wm = cwiid.Wiimote()
except RuntimeError:
# RuntimeError exception thrown if no Wiimote is trying to connect
# Wait a second
time.sleep(1)
# Try again
continue

99%的时间,一切正常,但偶尔,库会进入某种奇怪的状态,调用cwiid.Wiimote()导致库向stderr写入"套接字连接错误(控制通道)">,并且python抛出异常。发生这种情况时,每次后续对cwiid.Wiimote()调用都会导致相同的内容写入 stderr,并在我重新启动设备之前引发相同的异常。

我想做的是检测这个问题,并让python自动重新启动设备。

如果 cwiid 库处于奇怪状态,它抛出的异常类型也是RuntimeError,这与连接超时异常(这很常见)没有什么不同,所以我似乎无法以这种方式区分它。我想做的是在运行cwiid.Wiimote()后立即读取stderr,以查看是否出现消息"套接字连接错误(控制通道)">,如果是,请重新启动。

到目前为止,我可以使用一些os.dup()os.dup2()方法来重定向 stderr 以防止消息显示,但这似乎对我阅读 stderr 没有帮助。

大多数在线示例都涉及读取 stderr,如果您正在运行带有子进程的内容,这不适用于这种情况。

我怎样才能读取 stderr 来检测正在写入它的消息?

我想我正在寻找的是这样的:

while True:
try:
r, w = os.pipe()
os.dup2(sys.stderr.fileno(), r)
wm = cwiid.Wiimote()
except RuntimeError:
# RuntimeError exception thrown if no Wiimote is trying to connect
if ('Socket connect error (control channel)' in os.read(r, 100)):
# Reboot
# Wait a second
time.sleep(1)
# Try again
continue

这似乎没有按照我认为的方式工作。

作为与 stderr 战斗的替代方法,在放弃之前快速连续重试几次(应该处理连接错误)的以下方法怎么样:

while True:
for i in range(50):  # try 50 times
try:
wm = cwiid.Wiimote()
break        # break out of "for" and re-loop in "while"
except RuntimeError:
time.sleep(1)
else:
raise RuntimeError("permanent Wiimote failure... reboot!")

在后台,除了 dups 之外,subprocess还使用匿名管道来重定向子进程输出。 要让进程读取自己的标准,您需要手动执行此操作。 它涉及获取匿名管道,将标准错误重定向到管道的输入,运行有问题的stderr写入操作,从管道的另一端读取输出,以及清理所有内容。 这一切都很繁琐,但我想我在下面的代码中做对了。

cwiid.Wiimote调用的以下包装器将返回一个元组,其中包含函数调用返回的结果(NoneRuntimeError的情况下)和生成的 stderr 输出(如果有)。 例如,请参阅tests函数,了解它在各种条件下应该如何工作。 我尝试了调整您的示例循环,但不太了解当cwiid.Wiimote调用成功时会发生什么。 在您的示例代码中,您只需立即重新循环。

编辑:哎呀! 修复了example_loop()中调用Wiimote而不是作为参数传递的错误。

import time
import os
import fcntl
def capture_runtime_stderr(action):
"""Handle runtime errors and capture stderr"""
(r,w) = os.pipe()
fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK)
saved_stderr = os.dup(2)
os.dup2(w, 2)
try:
result = action()
except RuntimeError:
result = None
finally:
os.close(w)
os.dup2(saved_stderr, 2)
with os.fdopen(r) as o:
output = o.read()
return (result, output)
## some tests
def return_value():
return 5
def return_value_with_stderr():
os.system("echo >&2 some output")
return 10
def runtime_error():
os.system("echo >&2 runtime error occurred")
raise RuntimeError()
def tests():
print(capture_runtime_stderr(return_value))
print(capture_runtime_stderr(return_value_with_stderr))
print(capture_runtime_stderr(runtime_error))
os.system("echo >&2 never fear, stderr is back to normal")
## possible code for your loop
def example_loop():
while True:
(wm, output) = capture_runtime_stderr(cmiid.Wiimote)
if wm == None:
if "Socket connect error" in output:
raise RuntimeError("library borked, time to reboot")
time.sleep(1)
continue
## do something with wm??

相关内容

  • 没有找到相关文章

最新更新