Python集成测试:使用Paramiko伪造SSH服务器,并记录给出的命令



这是上下文:我正在编写一个过程,该过程通过SSH(带有Paramiko(连接到遥远的服务器,执行一些命令,然后将命令的结果返回到该过程。它尽我所能。

我现在正在尝试通过伪造SSH服务器进行"端到端"测试,以检查是否发送了SSH命令,并将其resulkt还给了该过程。

我找到了使用Paramiko的假SSH服务器,当我通过真实的SSH客户端连接到它时,它可以很好地工作,但是当我尝试与Paramiko SSH客户端连接到它时,我会出现错误:/

您对我做错了什么有任何了解吗?

这是我的代码:

服务器:

#!/usr/bin/env python
"""Fake SSH Server Utilizing Paramiko"""
import threading
import socket
import sys
import traceback
import paramiko
import os
__location__ = os.path.realpath(os.path.join(os.getcwd(), os.path.dirname(__file__)))
LOG = open("log.txt", "a")
#HOST_KEY = paramiko.RSAKey(filename='keys/private.key')
HOST_KEY = paramiko.RSAKey(filename=os.path.join(__location__, 'id_rsa.new'))
PORT = 2200

def handle_cmd(cmd, chan):
    """Branching statements to handle and prepare a response for a command"""
    response = ""
    if cmd.startswith("sudo"):
        send_ascii("sudo.txt", chan)
        return
    elif cmd.startswith("ls"):
        response = "pw.txt"
    elif cmd.startswith("version"):
        response = "Super Amazing Awesome (tm) Shell v1.1"
    elif cmd.startswith("pwd"):
        response = "/home/clippy"
    elif cmd.startswith("cd"):
        send_ascii("cd.txt", chan)
        return
    elif cmd.startswith("cat"):
        send_ascii("cat.txt", chan)
        return
    elif cmd.startswith("rm"):
        send_ascii("bomb.txt", chan)
        response = "You blew up our files! How could you???"
    elif cmd.startswith("whoami"):
        send_ascii("wizard.txt", chan)
        response = "You are a wizard of the internet!"
    elif ".exe" in cmd:
        response = "Hmm, trying to access .exe files from an ssh terminal..... Your methods are unconventional"
    elif cmd.startswith("cmd"):
        response = "Command Prompt? We only use respectable shells on this machine.... Sorry"
    elif cmd == "help":
        send_ascii("help.txt", chan)
        return
    else:
        send_ascii("clippy.txt", chan)
        response = "Use the 'help' command to view available commands"
    LOG.write(response + "n")
    LOG.flush()
    chan.send(response + "rn")

def send_ascii(filename, chan):
    """Print ascii from a file and send it to the channel"""
    with open('ascii/{}'.format(filename)) as text:
        chan.send("r")
        for line in enumerate(text):
            LOG.write(line[1])
            chan.send(line[1] + "r")
    LOG.flush()

class FakeSshServer(paramiko.ServerInterface):
    """Settings for paramiko server interface"""
    def __init__(self):
        self.event = threading.Event()
    def check_channel_request(self, kind, chanid):
        if kind == 'session':
            return paramiko.OPEN_SUCCEEDED
        return paramiko.OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED
    def check_auth_password(self, username, password):
        # Accept all passwords as valid by default
        return paramiko.AUTH_SUCCESSFUL
    def get_allowed_auths(self, username):
        return 'password'
    def check_channel_shell_request(self, channel):
        self.event.set()
        return True
    def check_channel_pty_request(self, channel, term, width, height, pixelwidth, pixelheight, modes):
        return True

def start_server():
    """Init and run the ssh server"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind(('', PORT))
    except Exception as err:
        print('*** Bind failed: {}'.format(err))
        traceback.print_exc()
        sys.exit(1)
    while True:
        try:
            sock.listen(100)
            print('Listening for connection ...')
            client, addr = sock.accept()
        except Exception as err:
            print('*** Listen/accept failed: {}'.format(err))
            traceback.print_exc()
        LOG.write("nnConnection from: " + addr[0] + "n")
        print('Got a connection!')
        try:
            transport = paramiko.Transport(client)
            transport.add_server_key(HOST_KEY)
            # Change banner to appear legit on nmap (or other network) scans
            transport.local_version = "SSH-2.0-OpenSSH_7.6p1 Ubuntu-4ubuntu0.3"
            server = FakeSshServer()
            try:
                transport.start_server(server=server)
            except paramiko.SSHException:
                print('*** SSH negotiation failed.')
                raise Exception("SSH negotiation failed")
            # wait for auth
            chan = transport.accept(20)
            if chan is None:
                print('*** No channel.')
                raise Exception("No channel")
            server.event.wait(10)
            if not server.event.is_set():
                print('*** Client never asked for a shell.')
                raise Exception("No shell request")
            try:
                chan.send("Welcome to the my control serverrnrn")
                run = True
                while run:
                    chan.send("$ ")
                    command = ""
                    while not command.endswith("r"):
                        transport = chan.recv(1024)
                        # Echo input to psuedo-simulate a basic terminal
                        chan.send(transport)
                        command += transport.decode("utf-8")
                    chan.send("rn")
                    command = command.rstrip()
                    LOG.write("$ " + command + "n")
                    print(command)
                    if command == "exit":
                        run = False
                    else:
                        handle_cmd(command, chan)
            except Exception as err:
                print('!!! Exception: {}: {}'.format(err.__class__, err))
                traceback.print_exc()
                try:
                    transport.close()
                except Exception:
                    pass
            chan.close()
        except Exception as err:
            print('!!! Exception: {}: {}'.format(err.__class__, err))
            traceback.print_exc()
            try:
                transport.close()
            except Exception:
                pass

if __name__ == "__main__":
    start_server()

这是我尝试与paramiko连接的代码:

    sshClient = paramiko.SSHClient()
    sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    sshClient.connect('127.0.0.1', username='bnc', password='pass', port=2200)
    sshClient.exec_command('ls')

以及

    sshClient = paramiko.SSHClient()
    sshClient.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    sshClient.connect('127.0.0.1', username='bnc', password='pass', port=2200)
    channel = sshClient.get_transport().open_session()
    channel.get_pty()
    channel.invoke_shell()
    channel.send('lsrn')

在两个测试中,我都有相同的错误:

Listening for connection ...
Got a connection!
!!! Exception: <class 'socket.error'>: Socket is closed
Traceback (most recent call last):
Listening for connection ...
  File "/home/cbrunain/Projects/daa/Python/test/ssh_mock_server.py", line 152, in start_server
    chan.send(transport)
  File "/home/cbrunain/.local/share/virtualenvs/daa-hnBs4Nn2/lib/python2.7/site-packages/paramiko/channel.py", line 801, in send
    return self._send(s, m)
  File "/home/cbrunain/.local/share/virtualenvs/daa-hnBs4Nn2/lib/python2.7/site-packages/paramiko/channel.py", line 1198, in _send
    raise socket.error("Socket is closed")
error: Socket is closed
No handlers could be found for logger "paramiko.transport"

我终于找到了有效的东西:https://gist.github.com/cschwede/3e2c025408ab4af531651098331cce45

import logging
import socket
import sys
import threading
import paramiko
logging.basicConfig()
logger = logging.getLogger()
running = True
host_key = paramiko.RSAKey(filename='id_rsa')

def ssh_command_handler(command):
    print('default : ', command)

class Server(paramiko.ServerInterface):
    def __init__(self):
        self.event = threading.Event()
    def check_channel_request(self, kind, chanid):
        if kind == 'session':
            return paramiko.OPEN_SUCCEEDED
    def check_auth_publickey(self, username, key):
        return paramiko.AUTH_SUCCESSFUL
    def get_allowed_auths(self, username):
        return 'publickey'
    def check_channel_exec_request(self, channel, command):
        global running
        # This is the command we need to parse
        if command == 'exit':
            running = False
        ssh_command_handler(command)
        self.event.set()
        return True

def listener():
    print('listener')
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 2222))
    sock.listen(100)
    client, addr = sock.accept()
    t = paramiko.Transport(client)
    t.set_gss_host(socket.getfqdn(""))
    t.load_server_moduli()
    t.add_server_key(host_key)
    server = Server()
    t.start_server(server=server)
    # Wait 30 seconds for a command
    server.event.wait(30)
    t.close()
    print('end listener')

def run_server(command_handler):
    global running
    global ssh_command_handler
    ssh_command_handler = command_handler
    while running:
        try:
            listener()
        except KeyboardInterrupt:
            sys.exit(0)
        except Exception as exc:
            logger.error(exc)

def run_in_thread(command_handler):
    thread = threading.Thread(target=run_server, args=(command_handler,))
    thread.start()

if __name__ == '__main__':
    run_in_thread(ssh_command_handler)

最新更新