我想运行一个命令,并在运行时将结果存储在Redis中。
尽管演示的命令是ls/etc/strong>,但在现实生活中,我希望将其用于长时间运行的进程。
我已经写了一些演示代码来展示这个想法。
不幸的是,这段代码在运行时坚持使用fileno,并且不起作用,即使我模拟了一个。我怎样才能做到这一点?
import subprocess
import redis
class RedisFile:
def __init__(self, key):
self.key = key
self.redis = redis.StrictRedis()
print("inited RedisFile with key:", key)
def write(self, value):
self.redis.append(self.key, value)
def main():
out = RedisFile("out")
error = RedisFile("error")
proc = subprocess.Popen(["ls", "/etc"],
stdout=out,
stderr=error,
bufsize=0
)
main()
您应该用自定义stdout替换sys.stdout
例如:
from unittest.mock import patch
with patch('sys.stdout', your_stdout),patch('sys.stderr', your_stderr):
do_something