在Python中处理大文件[1000 GB或更多]



假设我有一个1000 GB的文本文件。我需要找出一个短语在文本中出现了多少次。

是否有任何更快的方法来做到这一点,我正在使用下面?完成这项任务需要多少时间?

phrase = "how fast it is"
count = 0
with open('bigfile.txt') as f:
    for line in f:
        count += line.count(phrase)

如果我是对的,如果我在内存中没有这个文件,我需要等到PC每次加载文件时我正在做搜索,这应该至少需要4000秒的250 MB/秒的硬盘驱动器和10000 GB的文件。

我使用file.read()以块读取数据,在当前示例中块的大小分别为100mb, 500MB, 1GB和2GB。我的文本文件的大小是2.1 GB。

代码:

 from functools import partial
 def read_in_chunks(size_in_bytes):
    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt', 'r+b') as f:
        prev = ''
        count = 0
        f_read  = partial(f.read, size_in_bytes)
        for text in iter(f_read, ''):
            if not text.endswith('n'):
                # if file contains a partial line at the end, then don't
                # use it when counting the substring count. 
                text, rest = text.rsplit('n', 1)
                # pre-pend the previous partial line if any.
                text =  prev + text
                prev = rest
            else:
                # if the text ends with a 'n' then simple pre-pend the
                # previous partial line. 
                text =  prev + text
                prev = ''
            count += text.count(s)
        count += prev.count(s)
        print count

时间:

read_in_chunks(104857600)
$ time python so.py
10000000
real    0m1.649s
user    0m0.977s
sys     0m0.669s
read_in_chunks(524288000)
$ time python so.py
10000000
real    0m1.558s
user    0m0.893s
sys     0m0.646s
read_in_chunks(1073741824)
$ time python so.py
10000000
real    0m1.242s
user    0m0.689s
sys     0m0.549s

read_in_chunks(2147483648)
$ time python so.py
10000000
real    0m0.844s
user    0m0.415s
sys     0m0.408s

另一方面,简单的循环版本在我的系统上大约需要6秒:

def simple_loop():
    s = 'Lets say i have a text file of 1000 GB'
    with open('data.txt') as f:
        print sum(line.count(s) for line in f)
$ time python so.py
10000000
real    0m5.993s
user    0m5.679s
sys     0m0.313s

@SlaterTyranus的grep版本在我的文件中的结果:

$ time grep -o 'Lets say i have a text file of 1000 GB' data.txt|wc -l
10000000
real    0m11.975s
user    0m11.779s
sys     0m0.568s

@woot的解决方案结果:

$ time cat data.txt | parallel --block 10M --pipe grep -o 'Lets say i have a text file of 1000 GB' | wc -l
10000000
real    0m5.955s
user    0m14.825s
sys     0m5.766s

当我使用100 MB作为块大小时获得最佳计时:

$ time cat data.txt | parallel --block 100M --pipe grep -o 'Lets say i have a text file of 1000 GB' | wc -l
10000000
real    0m4.632s
user    0m13.466s
sys     0m3.290s

woot的第二个解决方案的结果:

$ time python woot_thread.py # CHUNK_SIZE = 1073741824
10000000
real    0m1.006s
user    0m0.509s
sys     0m2.171s
$ time python woot_thread.py  #CHUNK_SIZE = 2147483648
10000000
real    0m1.009s
user    0m0.495s
sys     0m2.144s

系统规格: Core i5-4670, 7200 RPM HDD

这是一个Python的尝试…您可能需要使用THREADS和CHUNK_SIZE。而且它是在短时间内编写的一堆代码,所以我可能没有考虑到所有内容。我确实重叠了我的缓冲区,以捕获介于两者之间的缓冲区,并且扩展了最后一个块以包含文件的其余部分。

import os
import threading
INPUTFILE ='bigfile.txt'
SEARCH_STRING='how fast it is'
THREADS = 8  # Set to 2 times number of cores, assuming hyperthreading
CHUNK_SIZE = 32768
FILESIZE = os.path.getsize(INPUTFILE)
SLICE_SIZE = FILESIZE / THREADS

class myThread (threading.Thread):
    def __init__(self, filehandle, seekspot):
        threading.Thread.__init__(self)
        self.filehandle = filehandle
        self.seekspot = seekspot
        self.cnt = 0
    def run(self):
        self.filehandle.seek( self.seekspot )
        p = self.seekspot
        if FILESIZE - self.seekspot < 2 * SLICE_SIZE:
            readend = FILESIZE
        else: 
            readend = self.seekspot + SLICE_SIZE + len(SEARCH_STRING) - 1
        overlap = ''
        while p < readend:
            if readend - p < CHUNK_SIZE:
                buffer = overlap + self.filehandle.read(readend - p)
            else:
                buffer = overlap + self.filehandle.read(CHUNK_SIZE)
            if buffer:
                self.cnt += buffer.count(SEARCH_STRING)
            overlap = buffer[len(buffer)-len(SEARCH_STRING)+1:]
            p += CHUNK_SIZE
filehandles = []
threads = []
for fh_idx in range(0,THREADS):
    filehandles.append(open(INPUTFILE,'rb'))
    seekspot = fh_idx * SLICE_SIZE
    threads.append(myThread(filehandles[fh_idx],seekspot ) )
    threads[fh_idx].start()
totalcount = 0 
for fh_idx in range(0,THREADS):
    threads[fh_idx].join()
    totalcount += threads[fh_idx].cnt
print totalcount

你考虑过使用parallel/grep吗?

cat bigfile.txt | parallel --block 10M --pipe grep -o 'how fast it is' | wc -l

你考虑过索引你的文件吗?搜索引擎的工作方式是创建一个从单词到它们在文件中的位置的映射。如果你有这样的文件:

Foo bar baz dar. Dar bar haa.

创建一个索引,如下所示:

{
    "foo": {0},
    "bar": {4, 21},
    "baz": {8},
    "dar": {12, 17},
    "haa": {25},
}

哈希表索引可以在0(1)中查找;所以它非常快。

如果有人搜索查询"bar baz",你首先将查询分解为它的组成词:["bar","baz"],然后你找到{4,21},{8};然后使用它直接跳转到查询文本可能存在的位置。

也有现成的索引搜索引擎解决方案;例如Solr或ElasticSearch。

建议使用grep代替python。会更快,一般来说,如果你在本地机器上处理1000GB的文本,你就做错了,但是撇开所有的判断不谈,grep提供了几个选项,这将使你的生活更轻松。

grep -o '<your_phrase>' bigfile.txt|wc -l

具体来说,这将计算您想要的短语出现的行数。这也应该计算在一行中多次出现。

如果你不需要,你可以这样做:

grep -c '<your_phrase>' bigfile.txt

我们讨论的是相当大的数据流中特定子字符串的简单计数。这个任务几乎肯定是I/O受限的,但很容易并行化。第一层是原始读取速度;我们可以选择通过压缩来减少读取量,或者通过将数据存储在多个位置来分配传输速率。然后是搜索本身;子字符串搜索是一个众所周知的问题,同样受到I/O限制。如果数据集来自单个磁盘,那么几乎任何优化都是没有意义的,因为磁盘无法在速度上胜过单个核心。

假设我们确实有块,例如,可能是bzip2文件的单独块(如果我们使用线程解压缩器),RAID中的条纹或分布式节点,我们可以从单独处理它们中获得很多好处。每个块搜索needle,然后通过从一个块的末尾和下一个块的开始取len(needle)-1,并在其中搜索,形成节点。

一个快速的基准测试证明正则表达式状态机比通常的in操作符运行得更快:

>>> timeit.timeit("x.search(s)", "s='a'*500000; import re; x=re.compile('foobar')", number=20000)
17.146117210388184
>>> timeit.timeit("'foobar' in s", "s='a'*500000", number=20000)
24.263535976409912
>>> timeit.timeit("n in s", "s='a'*500000; n='foobar'", number=20000)
21.562405109405518

我们可以执行的另一个优化步骤是,如果我们将数据放在文件中,则对其进行mmap,而不是使用通常的读操作。这允许操作系统直接使用磁盘缓冲区。它还允许内核以任意顺序满足多个读请求,而无需进行额外的系统调用,这使我们可以在多线程中操作时利用底层RAID之类的东西。

这是一个快速拼凑的原型。有一些事情显然可以改进,例如,如果我们有一个多节点集群,分配块进程,通过将一个传递给相邻的worker(在这个实现中不知道的顺序)来进行尾部+头部检查,而不是将两个都发送给特殊的worker,并且实现线程间限制队列(管道)类而不是匹配信号量。将工作线程移到主线程函数之外可能也是有意义的,因为主线程会不断更改其局部线程。

from mmap import mmap, ALLOCATIONGRANULARITY, ACCESS_READ
from re import compile, escape
from threading import Semaphore, Thread
from collections import deque
def search(needle, filename):
    # Might want chunksize=RAID block size, threads
    chunksize=ALLOCATIONGRANULARITY*1024
    threads=32
    # Read chunk allowance
    allocchunks=Semaphore(threads)  # should maybe be larger
    chunkqueue=deque()   # Chunks mapped, read by workers
    chunksready=Semaphore(0)
    headtails=Semaphore(0)   # edges between chunks into special worker
    headtailq=deque()
    sumq=deque()     # worker final results
    # Note: although we do push and pop at differing ends of the
    # queues, we do not actually need to preserve ordering. 
    def headtailthread():
        # Since head+tail is 2*len(needle)-2 long, 
        # it cannot contain more than one needle
        htsum=0
        matcher=compile(escape(needle))
        heads={}
        tails={}
        while True:
            headtails.acquire()
            try:
                pos,head,tail=headtailq.popleft()
            except IndexError:
                break  # semaphore signaled without data, end of stream
            try:
                prevtail=tails.pop(pos-chunksize)
                if matcher.search(prevtail+head):
                    htsum+=1
            except KeyError:
                heads[pos]=head
            try:
                nexthead=heads.pop(pos+chunksize)
                if matcher.search(tail+nexthead):
                    htsum+=1
            except KeyError:
                tails[pos]=tail
        # No need to check spill tail and head as they are shorter than needle
        sumq.append(htsum)
    def chunkthread():
        threadsum=0
        # escape special characters to achieve fixed string search
        matcher=compile(escape(needle))
        borderlen=len(needle)-1
        while True:
            chunksready.acquire()
            try:
                pos,chunk=chunkqueue.popleft()
            except IndexError:   # End of stream
                break
            # Let the re module do the heavy lifting
            threadsum+=len(matcher.findall(chunk))
            if borderlen>0:
                # Extract the end pieces for checking borders
                head=chunk[:borderlen]
                tail=chunk[-borderlen:]
                headtailq.append((pos,head,tail))
                headtails.release()
            chunk.close()
            allocchunks.release()  # let main thread allocate another chunk
        sumq.append(threadsum)
    with infile=open(filename,'rb'):
        htt=Thread(target=headtailthread)
        htt.start()
        chunkthreads=[]
        for i in range(threads):
            t=Thread(target=chunkthread)
            t.start()
            chunkthreads.append(t)
        pos=0
        fileno=infile.fileno()
        while True:
            allocchunks.acquire()
            chunk=mmap(fileno, chunksize, access=ACCESS_READ, offset=pos)
            chunkqueue.append((pos,chunk))
            chunksready.release()
            pos+=chunksize
            if pos>chunk.size():   # Last chunk of file?
                break
        # File ended, finish all chunks
        for t in chunkthreads:
            chunksready.release()   # wake thread so it finishes
        for t in chunkthreads:
            t.join()    # wait for thread to finish
        headtails.release()     # post event to finish border checker
        htt.join()
        # All threads finished, collect our sum
        return sum(sumq)
if __name__=="__main__":
    from sys import argv
    print "Found string %d times"%search(*argv[1:])

同样,修改整个东西来使用一些mapreduce例程(将块映射到计数,正面和反面,通过计数求和和检查尾部+头部部分来减少)是一个练习。

编辑:因为这个搜索似乎会用不同的针重复,索引将会快得多,能够跳过已知不匹配的部分的搜索。一种可能性是制作一个包含各种n-gram的块的地图(通过允许n-gram与下一个重叠来计算块的边界);然后,在需要加载原始数据块之前,这些地图可以组合起来发现更复杂的情况。当然有数据库可以做到这一点;寻找全文搜索引擎。

这里是使用数据库的第三个较长的方法。数据库肯定比文本大。我不确定索引是否是最优的,并且可以通过稍微使用索引来节省一些空间。(比如,也许WORD, POS, WORD更好,或者WORD, POS就很好,需要尝试一下)。

这可能在200ok的测试中表现不佳,因为它有很多重复的文本,但在更独特的数据上可能表现良好。

首先通过扫描单词等创建一个数据库:

import sqlite3
import re
INPUT_FILENAME = 'bigfile.txt'
DB_NAME = 'words.db'
FLUSH_X_WORDS=10000

conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()

cursor.execute("""
CREATE TABLE IF NOT EXISTS WORDS (
     POS INTEGER
    ,WORD TEXT
    ,PRIMARY KEY( POS, WORD )
) WITHOUT ROWID
""")
cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_WORD_POS
""")
cursor.execute("""
DROP INDEX IF EXISTS I_WORDS_POS_WORD
""")

cursor.execute("""
DELETE FROM WORDS
""")
conn.commit()
def flush_words(words):
    for word in words.keys():
        for pos in words[word]:
            cursor.execute('INSERT INTO WORDS (POS, WORD) VALUES( ?, ? )', (pos, word.lower()) )
    conn.commit()
words = dict()
pos = 0
recomp = re.compile('w+')
with open(INPUT_FILENAME, 'r') as f:
    for line in f:
        for word in [x.lower() for x in recomp.findall(line) if x]:
            pos += 1
            if words.has_key(word):
                words[word].append(pos)
            else:
                words[word] = [pos]
        if pos % FLUSH_X_WORDS == 0:
            flush_words(words)
            words = dict()
    if len(words) > 0:
        flush_words(words)
        words = dict()

cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_WORD_POS ON WORDS ( WORD, POS )
""")
cursor.execute("""
CREATE UNIQUE INDEX I_WORDS_POS_WORD ON WORDS ( POS, WORD )
""")
cursor.execute("""
VACUUM
""")
cursor.execute("""
ANALYZE WORDS
""")

然后生成SQL:

import sqlite3
import re
SEARCH_PHRASE = 'how fast it is'
DB_NAME = 'words.db'

conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
recomp = re.compile('w+')
search_list = [x.lower() for x in recomp.findall(SEARCH_PHRASE) if x]
from_clause = 'FROMn'
where_clause = 'WHEREn'
num = 0
fsep = '     '
wsep = '     '
for word in search_list:
    num += 1
    from_clause += '{fsep}words w{num}n'.format(fsep=fsep,num=num)
    where_clause += "{wsep} w{num}.word = '{word}'n".format(wsep=wsep, num=num, word=word)
    if num > 1:
        where_clause += "  AND w{num}.pos = w{lastnum}.pos + 1n".format(num=str(num),lastnum=str(num-1))
    fsep = '    ,'
    wsep = '  AND'

sql = """{select}{fromc}{where}""".format(select='SELECT COUNT(*)n',fromc=from_clause, where=where_clause)
res = cursor.execute( sql )
print res.fetchone()[0] 

我承认grep会更快。我假设这个文件是一个基于字符串的大文件。

但是如果你真的想的话,你也可以这样做。

import os
import re
import mmap
fileName = 'bigfile.txt'
phrase = re.compile("how fast it is")
with open(fileName, 'r') as fHandle:
    data = mmap.mmap(fHandle.fileno(), os.path.getsize(fileName), access=mmap.ACCESS_READ)
    matches = re.match(phrase, data)
    print('matches = {0}'.format(matches.group()))

最新更新