out从扭曲的进程协议接收,如果接收得太快,则合并消息(缓冲问题?



我正在使用Klein,一个基于扭曲的微Web框架。我有一个服务器(在Windows上运行!(,它将通过reactor.spawnProcess((生成一个外部长时间运行的进程(端到端测试(。 为了发送有关正在运行的测试的状态信息,我实现了一个进程协议:

class IPCProtocol(protocol.ProcessProtocol):
def __init__(self, status: 'Status', history: 'History'):
super().__init__()
self.status: Status = status
self.history: History = history
self.pid = None
def connectionMade(self):
self.pid = self.transport.pid
log.msg("process started, pid={}".format(self.pid))
def processExited(self, reason):
log.msg("process exited, status={}".format(reason.value.exitCode))
# add current run to history
self.history.add(self.status.current_run)
# create empty testrun and save status
self.status.current_run = Testrun()
self.status.status = StatusEnum.ready
self.status.save()
# check for more queue items
if not self.status.queue.is_empty():
start_testrun()
def outReceived(self, data: bytes):
data = data.decode('utf-8').strip()
if data.startswith(constants.LOG_PREFIX_FAILURE):
self.failureReceived()
if data.startswith(constants.LOG_PREFIX_SERVER):
data = data[len(constants.LOG_PREFIX_SERVER):]
log.msg("Testrunner: " + data)
self.serverMsgReceived(data)

我使用以下命令开始该过程:

ipc_protocol = IPCProtocol(status=app.status, history=app.history)
args = [sys.executable, 'testrunner.py', next_entry.suite, json.dumps(next_entry.testscripts)]
log.msg("Starting testrunn.py with args: {}".format(args))
reactor.spawnProcess(ipc_protocol, sys.executable, args=args)

要发送信息,我只需在 testrunner.py 中打印出消息(带有前缀以区分它们(。

问题是,如果我将打印命令发送到 fast,那么outReceived将合并消息。

我已经尝试为外部进程中的print()调用添加flush=True,但这并没有解决问题。其他一些问题建议使用usePTY=True作为 spawnProcess,但这在 Windows 下不受支持。 有没有比为每个print()呼叫添加小延迟(如time.sleep(0.1)(更好的方法来解决此问题?

你没有说,但似乎子进程将行写入其标准输出。 如果要在这些行上操作,则需要解析输出以查找行边界。

您可以使用LineOnlyReceiver来帮助您解决此问题。 由于进程不是流传输,因此不能直接使用LineOnlyReceiver。 您必须使其适应进程协议接口。 您可以自己执行此操作,也可以使用ProcessEndpoint(而不是spawnProcess(为您执行此操作。

例如:

from twisted.protocols.basic import LineOnlyReceiver
from twisted.internet.protocol import Factory
from twisted.internet.endpoints import ProcessEndpoint
from twisted.internet import reactor
endpoint = ProcessEndpoint(reactor, b"/some/some-executable", ...)
spawning_deferred = endpoint.connect(Factory.forProtocol(LineOnlyReceiver))
...

最新更新