如果我只能读取一行并在内存中只存储一行,如何对文本文件进行排序



我需要对用换行符分隔的大型文本文件进行排序。

我需要假设输入数据太大,无法放入主内存,这意味着我只能在内存中读取和存储文件的一行。因此,我无法制作一个列表或数组用于经典的排序算法(如mergesort、quicksort等(,因此我陷入了困境。

一个人怎么能解决这样的问题呢?

在实践中,在正常情况下,只需使用Unix排序命令。

LC_ALL=C sort file_in.txt > file_out.txt

对于一个非常大的文件,我会分布式使用排序构建到mapreduce中。请参阅MapReduce排序算法的工作原理?这是怎么回事。一个提示,如果你分散,保持分散。也就是说,输入文件、输出文件和操作都应该在分布式文件系统上,这样在任何一台机器上都不会出现瓶颈。

我确实有一次遇到过这样的情况:这两个都不起作用。当时的情况是,我需要对来自数据库的数据集进行排序和组织,但数据太大,无法在数据库中进行排序,而且我所在的机器没有空间容纳原始数据集。

我用mergesort解决了这个问题,在mergesort中,所有超过特定大小的块都保存在压缩文件中。关键逻辑是这样的:

for row in big query:
last_chunk = new chunk from row
chunk = None
while 0 < len(chunks):
chunk = chunks.pop()
if chunk.size < 1.2 * last_chunk.size:
last_chunk = merge(chunk, last_chunk)
else:
break
if chunk is not None:
chunks.append(chunk)
chunks.append(last_chunk)
while 1 < len(chunks):
chunks.append(merge(chunks.pop(), chunks.pop()))

然后在合并逻辑中,我对一个块是应该在内存中还是在压缩文件中结束、如何获取行以及如何写入行进行了推理。

(我的问题并没有这么简单,因为我在分组、求和、合并等。基本上是复制一个我无法在数据库中运行的报告,因为它没有足够的工作内存来运行它。(

这三个涵盖了我个人在大文件中遇到的每一种情况。


这是一个基于外部文件的合并排序的不太好但有效的实现。

首先,您需要我们要排序的demo.txt

hello
world
greetings
earthlings

现在Python代码按排序打印:

from tempfile import TemporaryFile
class FileBuffer():
def __init__ (self):
self.current_line = None
self.fh = TemporaryFile()
self.state = 'writing'
self.size = 0
def add (self, line):
if self.state != 'writing':
raise Exception(f"Cannot write to FileBuffer in state {self.state}")
self.size += len(line)
self.fh.write(bytes(line, encoding='utf-8'))
def finish_writing (self):
self.fh.seek(0, 0)
self.state = 'reading'
self.fetch()
return self.current_line
def fetch (self):
if self.state != 'reading':
raise Exception(f"Cannot read from FileBuffer in state {self.state}")
self.current_line = bytes.decode(self.fh.readline())
if self.current_line == '':
self.current_line = None
self.state = 'done'
return self.current_line
def __iter__(self):
return self
def __next__(self):
line = self.current_line
if line is None:
raise StopIteration
else:
self.fetch()
return line
class BufferedSort():
def __init__ (self):
self.buffers = []
def merge (self):
buffer1 = self.buffers.pop()
buffer2 = self.buffers.pop()
new_buffer = FileBuffer()
while buffer1.current_line is not None and buffer2.current_line is not None:
if buffer1.current_line < buffer2.current_line:
new_buffer.add(buffer1.current_line)
buffer1.fetch()
else:
new_buffer.add(buffer2.current_line)
buffer2.fetch()
while buffer1.current_line is not None:
new_buffer.add(buffer1.current_line)
buffer1.fetch()
while buffer2.current_line is not None:
new_buffer.add(buffer2.current_line)
buffer2.fetch()
new_buffer.finish_writing()
self.buffers.append(new_buffer)
def add (self, line):
buffer = FileBuffer()
buffer.add(line)
buffer.finish_writing()
self.buffers.append(buffer)
while 1 < len(self.buffers) and self.buffers[-2].size < 1.2 * self.buffers[-1].size:
self.merge()
def finish_writing(self):
while 2 < len(self.buffers):
self.merge()
def sorted_buffer(self):
self.finish_writing()
if len(self.buffers):
return self.buffers[0]
else:
buffer = FileBuffer()
buffer.state = 'done'
return buffer
def __iter__(self):
return self.sorted_buffer()
sorter = BufferedSort()
with open("demo.txt") as fh:
for line in fh:
sorter.add(line)
for line in sorter:
print(line, end="")

最新更新