如何统计子进程发出的每个UDP数据包?



我有一个Python应用程序,它编排对底层进程的调用。这些进程使用subprocess.check_output调用,它们对远程网络设备进行SNMP调用。

对于性能监控,我想计算发送的SNMP数据包的数量。我主要对数据包的计数感兴趣。请求/响应的数据包大小也很有趣,但不那么重要。目的是了解这个应用程序对防火墙造成的压力。

因此,为了便于讨论,让我们假设下面这个愚蠢的应用程序:

from subprocess import check_output
output = check_output(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
print(output)

这将导致在端口161上发送一个新的UDP数据包。

在这种情况下我如何计算它们?

这是另一个带有存根函数的版本(也可以是上下文管理器):

from subprocess import check_call

def start_monitoring():
    pass

def stop_monitoring():
    return 0

start_monitoring()
check_call(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
check_call(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
check_call(['snmpget', '-v2c', '-c', 'private', '192.168.1.1', '1.3.6.1.2.1.1.2.0'])
num_connections = stop_monitoring()
assert num_connections == 3

在这个人为的示例中,显然有3个调用,因为我手动执行SNMP调用。但是在实际示例中,SNMP调用的数量不等于对子流程的调用。有时会执行一个或多个get,有时是简单的遍历(即顺序UDP请求的批次),有时是批量遍历(未知数量的请求)。

所以我不能简单地监视应用程序被调用的次数。我真的必须监控UDP请求。

这样的事情可能吗?如果是,怎么做?

要知道这是在Linux上以非root用户运行的,这可能很重要。但是所有子进程都以相同的用户运行。

这可能帮助你:

https://sourceware.org/systemtap/examples/

tcpdumplike。stp为每个TCP &收到的UDP报文。每行包括源和目的IP地址、源和目的端口以及标志。

代码不是用python编写的,但是转换它没什么大不了的。

在这个答案之后,这个github repo通过另一个答案,我提出了以下UDP代理/中继的实现:

#!/usr/bin/env python
from collections import namedtuple
from contextlib import contextmanager
from random import randint
from time import sleep
import logging
import socket
import threading
import snmp

MSG_DONTWAIT = 0x40  # from socket.h
LOCK = threading.Lock()
MSG_TYPE_REQUEST = 1
MSG_TYPE_RESPONSE = 2
Statistics = namedtuple('Statistics', 'msgtype packet_size')

def visible_octets(data: bytes) -> str:
    """
    Returns a geek-friendly (hexdump)  output of a bytes object.
    Developer note:
        This is not super performant. But it's not something that's supposed to
        be run during normal operations (mostly for testing and debugging).  So
        performance should not be an issue, and this is less obfuscated than
        existing solutions.
    Example::
        >>> from os import urandom
        >>> print(visible_octets(urandom(40)))
        99 1f 56 a9 25 50 f7 9b  95 7e ff 80 16 14 88 c5   ..V.%P...~......
        f3 b4 83 d4 89 b2 34 b4  71 4e 5a 69 aa 9f 1d f8   ......4.qNZi....
        1d 33 f9 8e f1 b9 12 e9                            .3......
    """
    from binascii import hexlify, unhexlify
    hexed = hexlify(data).decode('ascii')
    tuples = [''.join((a, b)) for a, b in zip(hexed[::2], hexed[1::2])]
    line = []
    output = []
    ascii_column = []
    for idx, octet in enumerate(tuples):
        line.append(octet)
        # only use printable characters in ascii output
        ascii_column.append(octet if 32 <= int(octet, 16) < 127 else '2e')
        if (idx+1) % 8 == 0:
            line.append('')
        if (idx+1) % 8 == 0 and (idx+1) % 16 == 0:
            raw_ascii = unhexlify(''.join(ascii_column))
            raw_ascii = raw_ascii.replace(b'\n z', b'.')
            ascii_column = []
            output.append('%-50s %s' % (' '.join(line),
                                        raw_ascii.decode('ascii')))
            line = []
    raw_ascii = unhexlify(''.join(ascii_column))
    raw_ascii = raw_ascii.replace(b'\n z', b'.')
    output.append('%-50s %s' % (' '.join(line), raw_ascii.decode('ascii')))
    line = []
    return 'n'.join(output)

@contextmanager
def UdpProxy(remote_host, remote_port, queue=None):
    thread = UdpProxyThread(remote_host, remote_port, stats_queue=queue)
    thread.prime()
    thread.start()
    yield thread.local_port
    thread.stop()
    thread.join()

class UdpProxyThread(threading.Thread):
    def __init__(self, remote_host, remote_port, stats_queue=None):
        super().__init__()
        self.local_port = randint(60000, 65535)
        self.remote_host = remote_host
        self.remote_port = remote_port
        self.daemon = True
        self.log = logging.getLogger('%s.%s' % (
            __name__, self.__class__.__name__))
        self.running = True
        self._socket = None
        self.stats_queue = stats_queue
    def fail(self, reason):
        self.log.debug('UDP Proxy Failure: %s', reason)
        self.running = False
    def prime(self):
        """
        We need to set up a socket on a FREE port for this thread. Retry until
        we find a free port.
        This is used as a separate method to ensure proper locking and to ensure
        that each thread has it's own port
        The port can be retrieved by accessing the *local_port* member of the
        thread.
        """
        with LOCK:
            while True:
                try:
                    self._socket = socket.socket(socket.AF_INET,
                                                 socket.SOCK_DGRAM)
                    self._socket.bind(('', self.local_port))
                    break
                except OSError as exc:
                    self.log.warning('Port %d already in use. Shuffling...',
                                     self.local_port)
                    if exc.errno == 98:  # Address already in use
                        self.local_port = randint(60000, 65535)
                        self._socket.close()
                    else:
                        raise
    @property
    def name(self):
        return 'UDP Proxy Thread {} -> {}:{}'.format(self.local_port,
                                                     self.remote_host,
                                                     self.remote_port)
    def start(self):
        if not self._socket:
            raise ValueError('Socket was not set. Call prime() first!')
        super().start()
    def run(self):
        try:
            known_client = None
            known_server = (self.remote_host, self.remote_port)
            self.log.info('UDP Proxy set up: %s -> %s:%s',
                          self.local_port, self.remote_host, self.remote_port)
            while self.running:
                try:
                    data, addr = self._socket.recvfrom(32768, MSG_DONTWAIT)
                    self.log.debug('Packet received via %sn%s', addr,
                                   visible_octets(data))
                except BlockingIOError:
                    sleep(0.1)  # Give self.stop() a chance to trigger
                else:
                    if known_client is None:
                        known_client = addr
                    if addr == known_client:
                        self.log.debug('Proxying request packet to %sn%s',
                                       known_server, visible_octets(data))
                        self._socket.sendto(data, known_server)
                        if self.stats_queue:
                            self.stats_queue.put(Statistics(
                                MSG_TYPE_REQUEST, len(data)))
                    else:
                        self.log.debug('Proxying response packet to %sn%s',
                                       known_client, visible_octets(data))
                        self._socket.sendto(data, known_client)
                        if self.stats_queue:
                            self.stats_queue.put(Statistics(
                                MSG_TYPE_RESPONSE, len(data)))
            self.log.info('%s stopped!', self.name)
        finally:
            self._socket.close()
    def stop(self):
        self.log.debug('Stopping %s...', self.name)
        self.running = False

if __name__ == '__main__':
    logging.basicConfig(level=0)
    from queue import Queue
    stat_queue = Queue()
    with UdpProxy('192.168.1.1', 161, stat_queue) as proxied_port:
        print(snmp.get('1.3.6.1.2.1.1.2.0', '127.0.0.1:%s' % proxied_port,
                       'testing'))
    with UdpProxy('192.168.1.1', 161, stat_queue) as proxied_port:
        print(snmp.get('1.3.6.1.2.1.1.2.0', '127.0.0.1:%s' % proxied_port,
                       'testing'))
    while not stat_queue.empty():
        stat_item = stat_queue.get()
        print(stat_item)
        stat_queue.task_done()

__main__部分所示,它可以简单地如下使用:

    from queue import Queue
    stat_queue = Queue()
    with UdpProxy('192.168.1.1', 161, stat_queue) as proxied_port:
        print(snmp.get('1.3.6.1.2.1.1.2.0', '127.0.0.1:%s' % proxied_port,
                       'testing'))
    while not stat_queue.empty():
        stat_item = stat_queue.get()
        print(stat_item)
        stat_queue.task_done()

有一点需要注意:在这种情况下,snmp模块只是执行subprocess.check_output()来生成snmpget子进程。

您可以编写另一个python应用程序,以混杂模式连接到本地网络适配器。在这种模式下,您将看到通过网络适配器的所有流量。然后你可以过滤UDP流量从他们,并最终过滤数据包有一个源端口161。但这需要以root或特权用户的身份运行。

import socket
# the public network interface
HOST = socket.gethostbyname(socket.gethostname())
# create a raw socket and bind it to the public interface
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_IP)
s.bind((HOST, 0))
# Include IP headers
s.setsockopt(socket.IPPROTO_IP, socket.IP_HDRINCL, 1)
# receive all packages
s.ioctl(socket.SIO_RCVALL, socket.RCVALL_ON)
# receive a package
print s.recvfrom(65565)
# disabled promiscuous mode
s.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)

来源

只是一个想法,但是您可以使用UDP代理并简单地计算代理发送/接收的消息。一个简单的代理已经在SO上可用:简单的udp代理解决方案。在这里给出的解决方案中添加计数器是微不足道的。

如果需要并行运行它们,只需为每个进程使用不同的端口。否则,您可以为每个新流程重用相同的代理端口。

遗憾的是,由于您报告说您无法访问防火墙配置(我假设您也指本地防火墙—netfilter存在,无论它是否被用于任何事情),因此您可以使用的选项确实很少。

您可以在snmpcmd手册页上查看一些命令行选项。值得注意的是,-d选项将转储发送的每个数据包的内容,并且解析起来有些不愉快,但至少它在那里(即使在某些平台上的地址输出是错误的)。坦率地说,考虑到您的需求,这可能是您最好的选择。

还有一种选择(这将是极其不愉快的),您可以自己编写一个UDP代理,简单地充当各种SNMP实用程序的中介,(在计数之后)自己转发每个数据包。这不需要提升权限,因为SNMP 不需要端口为161——它完全满足于回答来自您发送的任何高端口的查询。因此,您可以绑定到您希望的本地主机接口上的任何端口,并将SNMP工具指向该端口,代理只需将它看到的所有内容来回重复到"真正的"目的地。

老实说,除非远程防火墙运行在一个陈旧的马铃薯上,否则防火墙任务永远不会造成显著的CPU负载,除非您确实数千规则。

snmpget特别允许您传递-d标志,记录发送到stdout的数据包(可以重定向)。

$ snmpget -d -v2c -c private 192.168.1.1 1.3.6.1.2.1.1.2.0
No log handling enabled - using stderr logging
Sending 44 bytes to UDP: [192.168.1.1]:161->[0.0.0.0]:0
. . .
Timeout: No Response from 192.168.1.1.

如果您测试的进程不在您的控制之下,我更喜欢使用不需要编码的东西:tcpdumpstrace。根据我的经验,当调用的参数/目的非常有限时,可以请求系统操作人员/开发人员进入有限的sudoers。

如果sudoers修改是绝对不允许的,你可以将你的测试应用定向到一个手工制作的代理应用,该代理应用被配置为进一步发送数据包,并将进行计数。

最新更新