子流程。Popen( "ssh host@remote cmd" ) 失败



我是Python Subprocess的新手。当我想使用python subprocess.popen.communate与shell cmd" net Ads join -U管理员"完成Interact passwd时,它的输出并未重定向到管道,但在stdout.my代码下。

import subprocess
import os
import sys
passwd = b'123456'
cmd = "net ads join -U administrator"
s1 = ("%sn" % str(passwd)).encode('utf-8')
p1 = subprocess.Popen(cmd1,shell=True,stdin=subprocess.PIPE,stdout=subprocess.PIPE,
                                     stderr=subprocess.PIPE,close_fds=True)
out,err = p1.communicate(s1)
print(out)

当我运行测试时,通信不起作用,输出仍在Stdout中。

$ python ads.py 
Enter administrator's password:

当我将CMD更改为" SSH root@ip ls/"时,我也遇到了问题。

我尝试了PDB,但是问题似乎不在这里。有任何建议来解决这个问题吗?谢谢

这是我在一个脚本中使用的ssh和运行命令的摘要类终端。

import pty
import os 
import re
class BaseError(Exception):
    pass
class SSHError(BaseError):
    pass
class Ssh:
    """SSH on the box"""
    def __init__(self, ip, passwd, user, port):
        self.ip = ip
        self.passwd = passwd
        self.user = user
        self.port = port
    def run_cmd(self, c):
        (pid, f) = pty.fork()
        if pid == 0:
            os.execlp(
                "ssh", "ssh", '-o ConnectTimeout=10',
                '-o StrictHostKeyChecking=no',
                '-o UserKnownHostsFile=/dev/null', '-q',
                '-p %d' % self.port,
                self.user + '@' + self.ip, c)
        else:
            return (pid, f)
    def run_scp(self, src, dst):
        (pid, f) = pty.fork()
        if pid == 0:
            os.execlp(
                "scp", "scp", '-o StrictHostKeyChecking=no',
                '-o UserKnownHostsFile=/dev/null', '-q',
                '-P %d' % self.port, "-r", src,
                self.user + '@' + self.ip + ':' + dst)
        else:
            return (pid, f)
    def read(self, f):
        x = ""
        try:
            x = os.read(f, 2024)
        except Exception:
            pass
        return x
    def sshOutputs(self, pid, f):
        output = ""
        readOutLoud = self.read(f)
        m = re.search("authenticity of host", readOutLoud)
        if m:
            os.write(f, 'yesn')
            while True:
                readOutLoud = self.read(f)
                m = re.search("Permanently added", readOutLoud)
                if m:
                    break
            readOutLoud = self.read(f)
        m = re.search("assword:", readOutLoud)
        if m:
            os.write(f, self.passwd + 'n')
            tmp = self.read(f)
            tmp += self.read(f)
            m = re.search("Permission denied", tmp)
            if m:
                raise SSHError("Invalid passwd")
            p = re.search("[pP]assword", tmp)
            if p:
                raise SSHError("Invalid passwd")
            # passwd was accepted
            readOutLoud = tmp
        while readOutLoud and len(readOutLoud) > 0:
            output += readOutLoud
            readOutLoud = self.read(f)
        os.waitpid(pid, 0)
        os.close(f)
        return output
    def exe_ssh_cmd(self, c):
        (pid, f) = self.run_cmd(c)
        return self.sshOutputs(pid, f)
    def exe_scp_cmd(self, src, dst):
        (pid, f) = self.run_scp(src, dst)
        return self.sshOutputs(pid, f)

相关内容

最新更新