Python外部合并排序运行缓慢



我是一名CS学生(因此是编程新手(,我正在尝试用Python实现一个外部合并排序算法,就像下面这样。要排序的数据是一个大的CSV文件,大约有9.000.000行,比如

1743-11-01,6.068,1.736,StadtA,Denmark,57.05N,10.33E
1744-06-01,5.787,3.623,StadtB,Belgien,47.05N,10.33E

到目前为止,我有这个功能,它将csv文件分离为多个预先排序的块:

def splitFiles(self, largeFileName, smallFileSize):
largeFileHandler = csv.reader(open(largeFileName), delimiter=',')
tempBuffer = []
size = 0
for row in largeFileHandler:
tempBuffer.append(row)
size += 1
if (size % smallFileSize == 0):
tempBuffer.sort(key=lambda x: (x[5], -float(x[1]), x[3]))
tempFile = tempfile.NamedTemporaryFile(dir=self.cwd + '/temp', delete=False, mode="wb+")
writer = csv.writer(tempFile)
writer.writerows(tempBuffer)
tempFile.seek(0)
sortedTempFileHandlerList.append(tempFile)
tempBuffer = []

这意味着区块按第五个元素降序排列。若那个值相等,则按第一个元素递增排序,若那个值也相等,则按照第三个元素排序。

在那之前一切都很好。但是,将这些块合并为一个(最终(大文件需要很长时间。

def merge(self):
stack_tops = []
sink = csv.writer(open("outputfile", "w+"))
for f in sortedTempFileHandlerList:
stack_tops.append(next(csv.reader(f)))
while stack_tops:
c = min(stack_tops, key=lambda x: (x[5], -float(x[1]), x[3]))
sink.writerow(c)
i = stack_tops.index(c)
try:
t = next(csv.reader(sortedTempFileHandlerList[i]))
stack_tops[i] = t
except StopIteration:
del stack_tops[i]
self.sortedTempFileHandlerList[i].close()
del self.sortedTempFileHandlerList[i]

我的问题是,我如何才能让第二部分跑得更快?提前感谢!

如果您的硬盘驱动器正在旋转,那么问题的原因可能是磁盘寻道过多。如果您并行读取数十个文件,磁盘驱动器会放弃,每次读取都会进行,直到磁盘旋转。转速为6000转/分时,整个旋转时间为0-0.01秒,平均0.005秒。当你的文件之间有很多兆字节的数据时,每秒200次读取加起来。

解决方案是一次只合并几个文件。

我个人过去写这篇文章的方式是保留一堆包含{file: ___, size: ___}的对象。然后你的逻辑看起来是这样的:

while more to read:
read a chunk
write a sorted chunk
add file info to stack
while n < len(stack) and sum of sizes of top n-1 elements exceeds the nth:
pop top n objects off of stack
merge them
push merged element onto stack
merge stack to get answer

你必须玩它来计算合并的数量。但我成功地使用了这种策略来处理具有数十亿行的数据集,并且表现良好。我想我把n设置为4。

(在我的现实生活中,复杂的例子是,由于磁盘限制,我不得不压缩所有的中间文件。我也在合并和聚合结果。我本想在数据库中进行,但数据库没有空间了。这是一个有趣的项目…(


更多详细信息。

首先,由于这是Python,我建议您将splitFiles制作成迭代器。像这样的伪代码:

def splitFiles (inputFile):
open inputFile
create in memory data
while more to read:
add to data
if data is full:
write sorted temp file
yield (fileSize, tempFile)
if data:
write sorted temp file
yield (fileSize, tempFile)

这些yield使它成为迭代器。现在你的主要逻辑是这样的:

stack = []
for chunk in splitFile(dataFile):
stack.append(chunk)
while 3 < len(stack) and stack[-4][0] < sum((stack[-i][0] for i in [1, 2, 3])):
chunks = []
for _ in range(4):
chunks.append(stack.pop())
stack.append(mergeChunks(chunks))
# Now the final merge
while 1 < len(stack):
chunks = []
for _ in range(4):
if len(stack):
chunks.append(stack.pop())
stack.append(mergeChunks(chunks))
return stack[0][1]

在你提供的8 3MB块的例子中,你的堆栈会像这样进行:

[]
[(3_000_000, tempFile1)]
[(3_000_000, tempFile1), (3_000_000, tempFile2)]
[(3_000_000, tempFile1), (3_000_000, tempFile2), (3_000_000, tempFile3)]
[(3_000_000, tempFile1), (3_000_000, tempFile2), (3_000_000, tempFile3), (3_000_000, tempFile4)]
# First merge!
[(12_000_000_000, tempFile5)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6), (3_000_000, tempFile7)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6), (3_000_000, tempFile7), (3_000_000, tempFile8)]
[(12_000_000_000, tempFile5), (3_000_000, tempFile6), (3_000_000, tempFile7), (3_000_000, tempFile8), (3_000_000, tempFile9)]
# Second merge!
[(12_000_000_000, tempFile5), (12_000_000_000, tempFile10)]
# End of input, final merge
[(24_000_000_000, tempFile11)]

最后一个文件就是你的结果。

@btilly让我们把它分解一下。在运行splitFiles方法之后,我得到了8个块,每个块3MB。堆栈看起来像

[(tempfile1, 3), (tempfile2, 3), (tempfile3, 3), (tempfile4, 3), (tempfile5, 3), (tempfile6, 3), (tempfile7, 3), (tempfile8, 3)]

当n为4(小于长度为8(,并且前n-1个元素的大小之和(9(大于第n个元素的尺寸(3(时,我们进入while循环。

前4个临时文件被合并,堆栈现在看起来像这样:

[(merged_chunk, 9), (tempfile5, 3), (tempfile6, 3), (tempfile7, 3), (tempfile8, 3)]

n仍然是4,堆栈的长度是5,并且前n-1个元素(15(的大小之和大于第n(3(个的大小

因此merged_chunk与tempfile5、tempfile6和tempfile7合并,堆栈如下:

[(merged_chunk, 18), (tempfile8, 3)]

现在我们正处于while条件失败的时刻,因为n大于堆栈的长度。到那时,merged_chunk和tempfile8会有最后一次合并吗?

最新更新