Python多线程文件处理



我有几个文件驻留在服务器上,我试图实现一个多线程进程,以提高性能,我读了一个教程,但有几个问题实现它,

这些是文件,

filelistread = ['h:\file1.txt', 
                'h:\file2.txt', 
                'h:\file3.txt', 
                'h:\file4.txt']
filelistwrte = ['h:\file1-out.txt','h:\file2-out.txt','h:\file3-out.txt','h:\file4-out.txt']

def workermethod(inpfile, outfile):
    f1 = open(inpfile,'r')
    f2 = open(outfile,'w')
    x = f1.readlines()
    for each in x:
        f2.write(each)
    f1.close()
    f2.close()

我如何实现使用线程类和队列?

我从下面的类开始,但不确定如何将inpfile和outputfile传递给run方法..任何输入都是赞赏的

class ThreadUrl(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:
            item = self.queue.get()

你把两种不同的溶液混在一起了。

如果你想为每个文件创建一个专用的工作线程,你不需要任何队列。如果你想创建一个线程池和一个文件队列,你不希望inpfileoutfile传递给run方法;您希望将它们放在队列上的每个作业中。

你如何在两者之间做出选择?好吧,第一种显然更简单,但是如果你有1000个文件要复制,你最终会创建1000个线程,这比你想要创建的线程要多,而且远远超过了操作系统能够处理的并行副本数量。线程池允许您创建,比如说,8个线程,并在队列上放置1000个作业,它们将根据需要分配给线程,因此一次运行8个作业。

让我们从解决方案1开始,为每个文件创建一个专用的工作线程。

首先,如果您没有与子类化Thread结婚,那么在这里确实没有理由这样做。您可以将target函数和args元组传递给默认构造函数,然后run方法将按照您的要求执行target(*args)。所以:

t = threading.Thread(target=workermethod, args=(inpfile, outfile))

这就是你所需要的。当每个线程运行时,它将调用workermethod(inpfile, outfile)然后退出。

但是,如果您确实出于某种原因想要子类化Thread,您可以这样做。您可以在构造时传入inpfileoutfile,而您的run方法将只是workermethod修改为使用self.inpfileself.outfile而不是接受参数。这样的:
class ThreadUrl(threading.Thread):
    def __init__(self, inpfile, outfile):
        threading.Thread.__init__(self)
        self.inpfile, self.outfile = inpfile, outfile
    def run(self):
        f1 = open(self.inpfile,'r')
        f2 = open(self.outfile,'w')
        x = f1.readlines()
        for each in x:
            f2.write(each)
        f1.close()
        f2.close()

无论哪种方式,我建议使用with语句而不是显式的openclose,并且去掉readlines(它不必要地将整个文件读取到内存中),除非您需要处理真正旧版本的Python:

    def run(self):
        with open(self.inpfile,'r') as f1, open(self.outfile,'w') as f2:
            for line in f1:
                f2.write(line)

现在,看看解决方案2:一个线程池和一个队列。

再次强调,这里不需要子类;两种方法之间的差异与解决方案1中相同。但是坚持你已经开始的子类设计,你想要这样的东西:

class ThreadUrl(threading.Thread):
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.queue = queue
    def run(self):
        while True:
            inpfile, outfile = self.queue.get()
            workermethod(inpfile, outfile)

然后你通过传递一个queue来启动你的线程:

q = queue.Queue
threadpool = [ThreadUrl(q) for i in range(poolsize)]

并像这样提交作业:

q.put((inpfile, outfile))

如果你要认真处理线程池,你可能想要使用一个健壮、灵活、简单和优化的实现,而不是自己编码。例如,您可能希望能够取消作业,很好地关闭队列,加入整个池而不是逐个加入线程,执行批处理或智能负载平衡等。

如果您正在使用Python 3,您应该查看标准库ThreadPoolExecutor。如果您坚持使用Python 2,或者无法弄清楚Future s,您可能需要查看隐藏在multiprocessing模块中的ThreadPool类。这两种方法都有一个优点,即从多线程切换到多处理(例如,如果您有一些cpu密集型的工作需要与IO并行处理)是微不足道的。你也可以搜索PyPI,你会发现许多其他好的实现。

作为旁注,您不希望调用队列queue,因为这会遮蔽模块名称。此外,有一个叫做workermethod的东西,它实际上是一个自由函数,而不是一个方法,这有点令人困惑。

最后,如果您所做的只是复制文件,那么您可能不想以文本模式读取,或者逐行读取。事实上,你可能根本不想自己实现它;只需使用shutil中适当的复制函数。你可以很容易地用上述任何一种方法做到这一点。例如:

t = threading.Thread(target=workermethod, args=(inpfile, outfile))

这样做:

t = threading.Thread(target=shutil.copyfile, args=(inpfile, outfile))

事实上,看起来你的整个程序都可以被替换:

threads = [threading.Thread(target=shutil.copyfile, args=(inpfile, outfile))
           for (inpfile, outfile) in zip(filelistread, filelistwrte)]
for thread in threads:
    thread.start()
for thread in threads:
    thread.join()

相关内容

  • 没有找到相关文章