使用Python加快并行读取大文件的速度



我需要处理两个大文件(>10亿行(,并根据一个文件中特定行的信息将每个文件拆分为小文件。这些文件在blocks中记录高通量测序数据(我们称之为测序reads(,而每个read包含4行(namesequencenquality(。read记录在两个文件中的顺序相同。

待办事项

基于file2.fq、中的id字段拆分file1.fq

这两个文件看起来像这样:

$ head -n 4 file1.fq
@name1_1
ACTGAAGCGCTACGTCAT
+
A#AAFJJJJJJJJFJFFF
$ head -n 4 file2.fq
@name1_2
TCTCCACCAACAACAGTG
+
FJJFJJJJJJJJJJJAJJ

我编写了以下python函数来完成这项工作:

def p7_bc_demx_pe(fn1, fn2, id_dict):
"""Demultiplex PE reads, by p7 index and barcode"""
# prepare writers for each small files
fn_writer = {}
for i in id_dict:
fn_writer[i] = [open(id_dict[i] + '.1.fq', 'wt'),
open(id_dict[i] + '.2.fq', 'wt')]
# go through each record in two files
with open(fn1, 'rt') as f1, open(fn2, 'rt') as f2:
while True:
try:
s1 = [next(f1), next(f1), next(f1), next(f1)]
s2 = [next(f2), next(f2), next(f2), next(f2)]
tag = func(s2) # a function to classify the record
fn_writer[tag][0].write(''.join(s1))
fn_writer[tag][1].write(''.join(s2))
except StopIteration:
break
# close writers
for tag in p7_bc_writer: 
fn_writer[tag][0].close() # close writers
fn_writer[tag][1].close() # close writers

问题

有没有办法加快这一进程?(以上功能太慢了(

如何将大文件拆分为具有特定lines的块(如f.seek(((,并使用多个核心并行运行该过程?

编辑-1

每个文件总共有5亿次读取(大小约为180 GB(。瓶颈是reading and writing文件。以下是我目前的解决方案(它有效,但绝对不是最好的(

我首先使用shell命令split -l将大文件拆分为小文件(大约需要3个小时(。

然后,将功能并行应用于8个小文件(耗时约1小时(

最后,合并结果(大约需要2小时(

还没有尝试PySpark,谢谢@John H

深入了解Spark。您可以将文件分布在一个集群中,以获得更快的处理速度。有一个python API:pyspark。

https://spark.apache.org/docs/0.9.0/python-programming-guide.html

这也为您提供了实际执行Java代码的优势,它不会受到GIL的影响,并允许真正的多线程。

最新更新