如何在扭曲的[Autobahn] Websocket服务器中实时流式传输输出



我想使用subprocess.popen((执行C程序,然后将其实时输出并将其发送给客户端。但是,输出是缓冲的,并在执行结束时一起发送(阻止性质(。如何实时收到输出,然后立即以扭曲的Autobahn发送。

def onConnect(self, request):
    try:
        self.cont_name = ''.join(random.choice(string.lowercase) for i in range(5)) 
        self.file_name = self.cont_name
        print("Connecting...")
    except Exception:
        print("Failed"+str(Exception))    
def onOpen(self):
    try:
        print("open")
    except Exception:
        print("Couldn't create container")
def onMessage(self, payload,isBinary=False):
        cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
        a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1)
        for line in iter(a.stdout.readline, b''):
            line = line.encode('utf8')
            self.sendMessage(line)
def onClose(self, wasClean, code, reason):
    try:
        print("Closed container...")
    except Exception:
        print(str(Exception))    

使用子程序执行Docker命令时,C代码的整个输出将立即返回而不是发生。for ex:

#include <stdio.h>
#include <unistd.h>
int main(){
int i=0;
for(i=0;i<5;i++){
    fflush(stdout);
    printf("Roundedn");
    sleep(3);
}
}

在容器中运行此操作后,该程序应在3秒后返回"四舍五入"到客户端。但是,它最终在执行结束时发送了所有"圆形"。

不当行为来自此方法的循环:

def onMessage(self, payload,isBinary=False):
        cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
        a = subprocess.Popen([cmd], shell=True, stdout=subprocess.PIPE, bufsize=1)
        for line in iter(a.stdout.readline, b''):
            line = line.encode('utf8')
            self.sendMessage(line)

Twisted是一个合作的多任务系统。默认情况下,所有内容都在单个线程("反应堆线程"(中运行。这意味着所有代码必须定期(通常很快(放弃控制权,以便其他代码(应用程序代码或扭曲实施代码(都有机会运行。此功能中的循环从子过程中读取,并尝试使用Autobahn API发送数据 - 一遍又一遍地,永远不要放弃控制。

阻止从流行对象的读取也可能引起问题。您将不知道读取会阻止多长时间,因此您不知道会阻止其他代码在反应堆线程中运行多长时间。您可以将popen读取到新线程中,它们不会阻止反应堆线程:

def onMessage(self, payload,isBinary=False):
    cmd = "docker exec "+self.cont_name+" /tmp/./"+self.file_name
    popen_in_thread(
        lambda line: reactor.callFromThread(
            lambda: self.sendMessage(line.encode("utf-8"))
        ),
        [cmd], shell=True, stdout=subprocess.PIPE, bufsize=1
    )
def popen_in_thread(callback, *args, **kwargs):
    def threaded():
        a = subprocess.Popen(*args, **kwargs)
        for line in iter(a.stdout.readline, b''):
            callback(line)
    reactor.callInThread(threaded)

或更好地使用Twisted自己的过程支持:

def onMessage(self, payload,isBinary=False):
    class ProcessLinesToMessages(ProcessProtocol):
        def outReceived(self, output):
            buf = self.buf + output
            lines = buf.splitlines()
            self.buf = lines.pop()
            for line in lines:
                self.sendMessage(line.encode("utf-8"))
            while True:
                line, self.buf = self.buf.splitline
    reactor.spawnProcess(
        ProcessLinesToMessages(),
        "docker",
        [
            "docker",
            "exec",
            self.cont_name,
            "/tmp/./ + self.file_name,
        ],
    )

(都没有测试过版本,希望这个想法很清楚(

最新更新