我需要处理两个大文件(>10亿行(,并根据一个文件中特定行的信息将每个文件拆分为小文件。这些文件在blocks
中记录高通量测序数据(我们称之为测序reads
(,而每个read
包含4行(name
、sequence
、n
、quality
(。read
记录在两个文件中的顺序相同。
待办事项
基于file2.fq
、中的id
字段拆分file1.fq
这两个文件看起来像这样:
$ head -n 4 file1.fq
@name1_1
ACTGAAGCGCTACGTCAT
+
A#AAFJJJJJJJJFJFFF
$ head -n 4 file2.fq
@name1_2
TCTCCACCAACAACAGTG
+
FJJFJJJJJJJJJJJAJJ
我编写了以下python函数来完成这项工作:
def p7_bc_demx_pe(fn1, fn2, id_dict):
"""Demultiplex PE reads, by p7 index and barcode"""
# prepare writers for each small files
fn_writer = {}
for i in id_dict:
fn_writer[i] = [open(id_dict[i] + '.1.fq', 'wt'),
open(id_dict[i] + '.2.fq', 'wt')]
# go through each record in two files
with open(fn1, 'rt') as f1, open(fn2, 'rt') as f2:
while True:
try:
s1 = [next(f1), next(f1), next(f1), next(f1)]
s2 = [next(f2), next(f2), next(f2), next(f2)]
tag = func(s2) # a function to classify the record
fn_writer[tag][0].write(''.join(s1))
fn_writer[tag][1].write(''.join(s2))
except StopIteration:
break
# close writers
for tag in p7_bc_writer:
fn_writer[tag][0].close() # close writers
fn_writer[tag][1].close() # close writers
问题
有没有办法加快这一进程?(以上功能太慢了(
如何将大文件拆分为具有特定lines
的块(如f.seek(((,并使用多个核心并行运行该过程?
编辑-1
每个文件总共有5亿次读取(大小约为180 GB(。瓶颈是reading and writing
文件。以下是我目前的解决方案(它有效,但绝对不是最好的(
我首先使用shell命令split -l
将大文件拆分为小文件(大约需要3个小时(。
然后,将功能并行应用于8个小文件(耗时约1小时(
最后,合并结果(大约需要2小时(
还没有尝试PySpark,谢谢@John H
深入了解Spark。您可以将文件分布在一个集群中,以获得更快的处理速度。有一个python API:pyspark。
https://spark.apache.org/docs/0.9.0/python-programming-guide.html
这也为您提供了实际执行Java代码的优势,它不会受到GIL的影响,并允许真正的多线程。