Python性能-最佳并行方法



我正在实现一个Python脚本,需要在不到5秒的时间内并行发送1500多个数据包。

简单地说,我需要的是:

def send_pkts(ip):
    #craft packet
    while True:
        #send packet
        time.sleep(randint(0,3))
for x in list[:1500]:
    send_pkts(x)
    time.sleep(randint(1,5))

我已经尝试了简单的单线程,多线程,多处理和多处理+多线程形式,并有以下问题:

  1. 简单的单线程:"for delay"似乎损害了"5秒"的依赖关系。
  2. 多线程:我认为由于Python GIL的限制,我无法完成我想要的。
  3. 多处理:这似乎是最有效的方法。然而,由于进程数量过多,我运行脚本的虚拟机冻结了(当然,1500个进程正在运行)。因此变得不切实际。
  4. 多处理+多线程:在这种方法中,我创建了更少的进程,每个进程调用一些线程(假设:10个进程每个调用150个线程)。很明显,VM不会像方法3那样冻结得那么快,但是我能达到的最大"并发数据包发送"是~800。吉尔的限制?VM的限制?在这个尝试中,我也尝试使用进程池,但结果相似。

有更好的方法来完成这个任务吗?

[1]编辑1:

 def send_pkt(x):
     #craft pkt
     while True:
         #send pkt
         gevent.sleep(0)
 gevent.joinall([gevent.spawn(send_pkt, x) for x in list[:1500]])

[2] EDIT 2 (gevent monkey- patchching):

from gevent import monkey; monkey.patch_all()
jobs = [gevent.spawn(send_pkt, x) for x in list[:1500]]
gevent.wait(jobs)
#for send_pkt(x) check [1]

然而,我得到了以下错误:"ValueError: filedescriptor out of range in select()"。所以我检查了我的系统极限(软和硬都是最大:65536)。之后,我检查了它与Linux上的select()限制(最大1024 fds)有关。请查看:http://man7.org/linux/man-pages/man2/select.2.html (bug部分)-为了克服这个问题,我应该使用poll() (http://man7.org/linux/man-pages/man2/poll.2.html)。但是对于poll(),我又回到了同样的限制:因为轮询是一种"阻塞方法"。

Regards

在Python中使用并行性时,一个好的方法是使用ThreadPoolExecutor或ProcessPoolExecutor fromhttps://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures根据我的经验,这些方法很有效。

一个threaddpoolexecutor的示例,可以根据您的使用进行调整。

import concurrent.futures
import urllib.request
import time
IPs= ['168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204',
        '168.212. 226.204']
def send_pkt(x):
  status = 'Failed'
  while True:
    #send pkt
    time.sleep(10)
    status = 'Successful'
    break
  return status
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    future_to_ip = {executor.submit(send_pkt, ip): ip for ip in IPs}
    for future in concurrent.futures.as_completed(future_to_ip):
        ip = future_to_ip[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (ip, exc))
        else:
            print('%r send %s' % (url, data))

选项3的结果:"由于进程数量过多,我正在运行脚本的VM冻结(当然,正在运行1500个进程)"可能需要进一步调查。我认为,从目前收集到的信息来看,可能还不能确定这是多处理方法的缺点,还是VM的限制。

一个相当简单和直接的方法是运行一个缩放实验:不是让所有的发送都来自单个进程,也不是所有的发送都来自相同的进程,而是尝试中间值。计算在两个进程之间将工作负载分成两半所花费的时间,或者4、8等等。

在这样做的同时,在Windows上运行xperf或Linux上运行oprofile这样的工具也可能是一个好主意,以记录这些不同的并行性选择是否会导致不同类型的瓶颈,例如CPU缓存的冲击,运行虚拟机内存不足,或者谁知道其他什么。最简单的方法就是试一试。

根据以前处理这类问题的经验和一般经验法则,我认为当多处理进程的数量小于或等于可用CPU内核的数量时(无论是在VM本身还是在虚拟机管理程序上),性能会达到最佳。然而,这是假设问题是CPU限制;如果在数据包发送过程中出现阻塞,那么在超过#cpu进程的情况下,性能可能仍然会更高,如果与其他阻塞操作交错,则可以更好地利用cpu时间。不过,在一些分析和/或缩放实验完成之前,我们也不知道。

你是正确的,python是单线程的,但是你想要的任务(发送网络数据包)被认为是io绑定的操作,因此是多线程的一个很好的候选者。你的主线程在传输数据包时并不繁忙,只要你在编写代码时考虑到异步。

查看异步tcp网络的python文档- https://docs.python.org/3/library/asyncio-protocol.html#tcp-echo-client.

如果瓶颈是基于http的("发送数据包"),那么GIL实际上应该不是太大的问题。

如果python内部也在进行计算,那么GIL可能会碍事,就像你说的,基于进程的并行性更可取。

每个任务不需要一个进程!这似乎是你思想上的疏忽。使用python的Pool类,您可以轻松地创建一组工作人员,这些工作人员将从队列中接收任务。


import multiprocessing

def send_pkts(ip):
   ...

number_of_workers = 8
with multiprocessing.Pool(number_of_workers) as pool:
    pool.map(send_pkts, list[:1500])

您现在正在运行number_of_workers + 1进程(工作进程+原始进程),并且N个工作进程并发地运行send_pkts函数。

阻碍您实现理想性能的主要问题是send_pkts()方法。它不仅发送数据包,还制作数据包:

def send_pkts(ip):
#craft packet
while True:
    #send packet
    time.sleep(randint(0,3))

虽然发送数据包几乎肯定是一个I/O绑定任务,但制作数据包几乎肯定是一个CPU绑定任务。这个方法需要分成两个任务:

  1. 生成数据包
  2. 发送数据包

我已经写了一个基本的套接字服务器和一个客户端应用程序,制作和发送数据包到服务器。这个想法是有一个单独的进程来制作数据包并将它们放入队列中。有一个线程池与包制作进程共享队列。这些线程将数据包从队列中取出并发送给服务器。它们还将服务器的响应粘贴到另一个共享队列中,但这只是为了我自己的测试,与您要做的事情无关。当线程从队列中获得None(毒丸)时退出。

server.py:

import argparse
import socketserver
import time

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--host", type=str, help="bind to host")
    parser.add_argument("--port", type=int, help="bind to port")
    parser.add_argument("--packet-size", type=int, help="size of packets")
    args = parser.parse_args()
    HOST, PORT = args.host, args.port
    class MyTCPHandler(socketserver.BaseRequestHandler):
        def handle(self):
            time.sleep(1.5)
            data = self.request.recv(args.packet_size)
            self.request.sendall(data.upper())
    with socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler) as server:
        server.serve_forever()

client.py:

import argparse
import logging
import multiprocessing as mp
import os
import queue as q
import socket
import time
from threading import Thread

def get_logger():
    logger = logging.getLogger("threading_example")
    logger.setLevel(logging.INFO)
    fh = logging.FileHandler("client.log")
    fmt = '%(asctime)s - %(threadName)s - %(levelname)s - %(message)s'
    formatter = logging.Formatter(fmt)
    fh.setFormatter(formatter)
    logger.addHandler(fh)
    return logger

class PacketMaker(mp.Process):
    def __init__(self, result_queue, max_packets, packet_size, num_poison_pills, logger):
        mp.Process.__init__(self)
        self.result_queue = result_queue
        self.max_packets = max_packets
        self.packet_size = packet_size
        self.num_poison_pills = num_poison_pills
        self.num_packets_made = 0
        self.logger = logger
    def run(self):
        while True:
            if self.num_packets_made >= self.max_packets:
                for _ in range(self.num_poison_pills):
                    self.result_queue.put(None, timeout=1)
                self.logger.debug('PacketMaker exiting')
                return
            self.result_queue.put(os.urandom(self.packet_size), timeout=1)
            self.num_packets_made += 1

class PacketSender(Thread):
    def __init__(self, task_queue, result_queue, addr, packet_size, logger):
        Thread.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.server_addr = addr
        self.packet_size = packet_size
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect(addr)
        self.logger = logger
    def run(self):
        while True:
            packet = self.task_queue.get(timeout=1)
            if packet is None:
                self.logger.debug("PacketSender exiting")
                return
            try:
                self.sock.sendall(packet)
                response = self.sock.recv(self.packet_size)
            except socket.error:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect(self.server_addr)
                self.sock.sendall(packet)
                response = self.sock.recv(self.packet_size)
            self.result_queue.put(response)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--num-packets', type=int, help='number of packets to send')
    parser.add_argument('--packet-size', type=int, help='packet size in bytes')
    parser.add_argument('--num-threads', type=int, help='number of threads sending packets')
    parser.add_argument('--host', type=str, help='name of host packets will be sent to')
    parser.add_argument('--port', type=int, help='port number of host packets will be sent to')
    args = parser.parse_args()
    logger = get_logger()
    logger.info(f"starting script with args {args}")
    
    packets_to_send = mp.Queue(args.num_packets + args.num_threads)
    packets_received = q.Queue(args.num_packets)
    producers = [PacketMaker(packets_to_send, args.num_packets, args.packet_size, args.num_threads, logger)]
    senders = [PacketSender(packets_to_send, packets_received, (args.host, args.port), args.packet_size, logger)
               for _ in range(args.num_threads)]
    start_time = time.time()
    logger.info("starting workers")
    for worker in senders + producers:
        worker.start()
    for worker in senders:
        worker.join()
    logger.info("workers finished")
    end_time = time.time()
    print(f"{packets_received.qsize()} packets received in {end_time - start_time} seconds")

run.sh:

#!/usr/bin/env bash
for i in "$@"
do
case $i in
    -s=*|--packet-size=*)
    packet_size="${i#*=}"
    shift 
    ;;
    -n=*|--num-packets=*)
    num_packets="${i#*=}"
    shift 
    ;;
    -t=*|--num-threads=*)
    num_threads="${i#*=}"
    shift 
    ;;
    -h=*|--host=*)
    host="${i#*=}"
    shift 
    ;;
    -p=*|--port=*)
    port="${i#*=}"
    shift 
    ;;
    *)
    ;;
esac
done
python3 server.py --host="${host}" 
                  --port="${port}" 
                  --packet-size="${packet_size}" &
server_pid=$!
python3 client.py --packet-size="${packet_size}" 
                  --num-packets="${num_packets}" 
                  --num-threads="${num_threads}" 
                  --host="${host}" 
                  --port="${port}"
kill "${server_pid}"

。美元/run.sh - s = 1024 - n = 1500 - t = 300 - h = localhost - p = 9999

1500个包在4.70330023765564秒内收到

。美元/run.sh - s = 1024 - n = 1500 - t = 1500 - h = localhost - p = 9999

1500个包在1.5025699138641357秒内收到

可以通过将client.py中的日志级别更改为DEBUG来验证该结果。注意,脚本的完成时间确实比4.7秒长得多。当使用300个线程时,需要大量的拆卸,但是日志清楚地表明,线程在4.7秒内完成处理。

对所有性能结果持保留态度。我不知道你用的是什么系统。我将提供我的相关系统统计:2 Xeon X5550 @2.67GHz24MB DDR3 @1333MHzDebian 10Python 3.7.3


我会解决你尝试的问题:

  1. 简单的单线程:由于randint(0, 3)延迟,这几乎保证至少需要1.5 x num_packets秒
  2. 多线程:GIL可能是这里的瓶颈,但这可能是因为craft packet部分而不是send packet
  3. Multiprocessing:每个进程至少需要一个文件描述符,所以您可能超出了用户或系统限制,但如果您更改适当的设置,这可以工作
  4. 多处理+多线程:这失败的原因与#2相同,制作数据包可能是CPU绑定

经验法则是:I/O受限-使用线程,CPU受限-使用进程

最新更新