没有队列的Python多线程处理大型数据集



我正在运行一个大约800k行的csv文件。我需要一个线程化解决方案,该解决方案贯穿每一行,一次将32个线程派生到一个worker中。我想在不排队的情况下完成这项工作。看起来当前带有队列的python线程解决方案正在消耗大量内存。

基本上想要读取csv文件行并将其放入工作线程中。并且一次只希望运行32个线程。

这是当前脚本。它似乎正在将整个csv文件读取到队列中并执行queue.join()。它将整个csv加载到队列中然后生成线程是否正确?

queue=Queue.Queue()
def worker():
    while True:
        task=queue.get()
        try:
            subprocess.call(['php {docRoot}/cli.php -u "api/email/ses" -r "{task}"'.format(
                docRoot=docRoot,
                task=task
            )],shell=True)
        except:
            pass
        with lock:
            stats['done']+=1
            if int(time.time())!=stats.get('now'):
                stats.update(
                    now=int(time.time()),
                    percent=(stats.get('done')/stats.get('total'))*100,
                    ps=(stats.get('done')/(time.time()-stats.get('start')))
                )
                print("r    {percent:.1f}% [{progress:24}] {persec:.3f}/s ({done}/{total}) ETA {eta:<12}".format(
                    percent=stats.get('percent'),
                    progress=('='*int((23*stats.get('percent'))/100))+'>',
                    persec=stats.get('ps'),
                    done=int(stats.get('done')),
                    total=stats.get('total'),
                    eta=snippets.duration.time(int((stats.get('total')-stats.get('done'))/stats.get('ps')))
                ),end='')
           queue.task_done()

    for i in range(32):
        workers=threading.Thread(target=worker)
        workers.daemon=True
        workers.start()
    try:
        with open(csvFile,'rb') as fh:
        try:
                dialect=csv.Sniffer().sniff(fh.readline(),[',',';'])
            fh.seek(0)
            reader=csv.reader(fh,dialect)
            headers=reader.next()
        except csv.Error as e:
            print("rERROR[CSV] {error}n".format(error=e))
        else:
            while True:
            try:
                data=reader.next()
            except csv.Error as e:
                print("rERROR[CSV] - Line {line}: {error}n".format(                                       line=reader.line_num, error=e))
            except StopIteration:
                break
            else:
                stats['total']+=1
             queue.put(urllib.urlencode(dict(zip(headers,data)+dict(campaign=row.get('Campaign')).items())))
        queue.join()

32个线程可能会被过度使用,除非您有一些巨大的硬件可用。

最佳线程或进程数的经验法则是:(no. of cores * 2) - 1在大多数硬件上为7或15。

最简单的方法是启动7个线程,每个线程传递一个"偏移量"作为参数。即从0到7的数字。

然后,每个线程都会跳过行,直到达到"偏移"数并处理该行。处理完该行后,它可以跳过6行并处理第7行——重复直到没有更多行。

这种设置适用于线程和多个进程,在大多数机器上的I/O非常高效,因为所有线程在任何给定时间都应该读取文件的大致相同部分。

我应该补充一点,这个方法对python特别好,因为每个线程在启动后或多或少都是独立的,并且避免了其他方法常见的可怕的python全局锁。

我不明白你为什么要每行生成32个线程。然而,并行数据处理是一件非常常见的、令人尴尬的并行工作,并且可以使用Python的multiprocessing库轻松实现。

示例:

from multiprocessing import Pool
def job(args):
    # do some work
inputs = [...]  # define your inputs
Pool().map(job, inputs)

我让您来填补空白,以满足您的特定要求

请参阅:https://bitbucket.org/ccaih/ccav/src/tip/bin/许多这种模式的例子。

其他答案解释了如何在不必管理队列的情况下使用Pool(它为您管理队列),并且您不希望将进程数设置为32,而是将CPU计数设置为-1。我想补充两件事。首先,您可能想看看pandas包,它可以轻松地将csv文件导入Python。第二,在其他答案中使用Pool的例子只传递了一个接受单个参数的函数。不幸的是,您只能向Pool传递一个带有函数所有输入的对象,这使得使用带有多个参数的函数变得困难。以下代码允许您使用池调用以前定义的具有多个参数的函数:

import multiprocessing
from multiprocessing import Pool
def multiplyxy(x,y):
    return x*y

def funkytuple(t):
    """
    Breaks a tuple into a function to be called and a tuple
    of arguments for that function. Changes that new tuple into
    a series of arguments and passes those arguments to the
    function.
    """
    f = t[0]
    t = t[1]

    return f(*t)

def processparallel(func, arglist):
    """
    Takes a function and a list of arguments for that function
    and proccesses in parallel.
    """
    parallelarglist = []
    for entry in arglist:
        parallelarglist.append((func, tuple(entry)))
    cpu_count = int(multiprocessing.cpu_count() - 1)

    pool = Pool(processes = cpu_count)
    database = pool.map(funkytuple, parallelarglist)
    pool.close()
    return database
#Necessary on Windows
if __name__ == '__main__':
    x = [23, 23, 42, 3254, 32]
    y = [324, 234, 12, 425, 13]
    i = 0
    arglist = []
    while i < len(x):
        arglist.append([x[i],y[i]])
        i += 1

    database = processparallel(multiplyxy, arglist)
    print(database)

您的问题很不清楚。您是否尝试过将Queue初始化为最大大小(例如64)?

myq = Queue.Queue(maxsize=64)

然后,试图在myq.put()新项目的生产者(一个或多个)将阻止,直到消费者将队列大小减小到小于64。这将相应地限制队列所消耗的内存量。默认情况下,队列是无限制的:如果生产者添加项目的速度快于消费者删除项目的速度,那么队列可能会增长到消耗您所有的RAM。

编辑

这是当前脚本。它似乎正在阅读将整个csv文件放入队列并执行queue.join().Is它纠正了将整个csv加载到队列中的错误然后生成线程?

你的帖子中的缩进搞砸了,所以必须猜测一些,但是:

  1. 该代码显然在打开CSV文件之前启动了32个线程
  2. 您没有显示创建队列的代码。如上所述,如果它是Queue.Queue,默认情况下它是无边界的,并且如果主循环在其上放置项目的速度快于线程从中删除项目的速度,则可以增长到任何大小。由于您没有说明worker()的功能(或显示其代码),我们没有足够的信息来猜测是否是这样。但记忆的使用是失控的表明就是这样
  3. 而且,正如所解释的,您可以通过在创建队列时指定最大大小来轻松停止这种情况

要获得更好的答案,请提供更好的信息;-)

另一个编辑

嗯,压痕还是有点乱,但好多了。您是否尝试过任何建议?看起来你的工作线程每个都会产生一个新的进程,所以它们需要比从csv文件中读取另一行更长的时间。因此,很有可能你把物品放在队列中的速度远远快于它们被拿走的速度。因此,这是第无数次;-)TRY使用(例如)maxsize=64初始化队列。然后揭示发生了什么。

顺便说一句,worker()中裸露的except:子句是一个非常糟糕的想法。如果出了什么问题,你永远不会知道。如果忽略所有可能的异常(甚至包括KeyboardInterruptSystemExit),至少要记录异常信息。

请注意@JamesAnderson所说的:除非你有非凡的硬件资源,否则尝试一次运行32个进程几乎肯定比运行不超过可用内核数量两倍的进程慢。再说一遍,这在很大程度上取决于PHP程序的功能。例如,如果PHP程序大量使用磁盘I/O,任何多处理都可能比没有慢。

最新更新