如何使用多处理池读取文件



我想读取包含2GB的内容的文件,我尝试使用多处理池进行操作,但是它会出现错误:

TypeError: 'type' object is not iterable

我知道,地图总是接受参数,但是有办法做到这一点吗?到目前为止,这是我的代码:

def load_embeddings(FileName):
    #file = open(FileName,'r')
    embeddings = {}
    i = 0
    print  "Loading word embeddings first time"
    for line in FileName:
             # print line
            tokens = line.split('t')
            tokens[-1] = tokens[-1].strip()
            #each line has 400 tokens
            for i in xrange(1, len(tokens)):
                    tokens[i] = float(tokens[i])
                    embeddings[tokens[0]] = tokens[1:-1]
    print  "finished"
    return embeddings
if __name__ == "__main__":
    t1 = time.time()
    p = Pool(processes=5)
    FileName  = './asag/Resources/EN-wform.w.5.cbow.neg10.400.subsmpl.txt'
    file_ = open(FileName,'r')
    #fun = partial(load_embeddings,FileName) 
    result = p.map(load_embeddings, file_)
    p.close()
    p.join()
    print ("Time it took :" + str(time.time() - t1))

如果源代码在单过程环境中运行,则您的源代码将是正确的。尽管您的参数FileName应命名为file,因为它确实是一个打开的文件句柄,而不是文件名(String)。

现在,发生的事情是,您正在为5个进程提供相同的文件处理。使用for line in FileName,您可以在文件句柄上进行读取操作。这在5个不同的过程中并行发生。所有人都不知道其他人(这就是它的美:因为操作系统都是不同的程序。但是它们都从同一文件句柄中读取)。现在,似乎这不是原子,并且在唯一部分读取该行后可以中断此呼叫。也可能是,python在内部缓冲,但缓冲区是每个过程。这导致在line或第一行的一部分和第二行的一部分中有一半行(因为Python只是读取直到看到第一个n),然后当您想进一步处理该行时会出现错误。

>

要解决此问题,您需要首先在主过程中读取文件,然后将行交给map函数,例如:

from multiprocessing import Pool
def load_embeddings(line):
    embeddings = {}
    i = 0
    tokens = line.split('t')
    tokens[-1] = tokens[-1].strip()
    #each line has 400 tokens
    for i in xrange(1, len(tokens)):
            tokens[i] = float(tokens[i])
            embeddings[tokens[0]] = tokens[1:-1]
    print "finished"
    return embeddings
if __name__ == "__main__":
    p = Pool(processes=5)
    file_name  = 'file.tsf'
    lines = []
    with open(file_name,'r') as f:
        for line in f:
            lines.append(line.strip())
    result = p.map(load_embeddings, lines)
    p.close()
    p.join()

相关内容

  • 没有找到相关文章

最新更新