我有一个文本文件操纵的代码。尽管文本文件很大,并且我当前的代码计算时需要30天才能完成。
如果多处理是他唯一的方法,我拥有一个带有40个内核的服务器。
cell_line_final2.bed:
chr1 778704 778912 MSPC_Peak_37509 8.43 cell_line GM12878 CTCF ENCSR000AKB CNhs12333 132
chr1 778704 778912 MSPC_Peak_37509 8.43 cell_line GM12878 CTCF ENCSR000AKB CNhs12331 132
chr1 778704 778912 MSPC_Peak_37509 8.43 cell_line GM12878 CTCF ENCSR000AKB CNhs12332 132
chr1 869773 870132 MSPC_Peak_37508 74.0 cell_line GM12878 CTCF ENCSR000AKB CNhs12333 132
...
...
tf_tpm2.bed:
CNhs12333 2228319 4.41 CTCF
CNhs12331 6419919 0.0 HES2
CNhs12332 6579994 0.78 ZBTB48
CNhs12333 8817465 0.0 RERE
...
...
所需的输出是在" cell_line_final2.bed"中添加一列,其中" tf_tpm2.bed"的第一个和第四列匹配" cell_line_final2.bed"的第10和8列。
。chr1 778704 778912 MSPC_Peak_37509 8.43 cell_line GM12878 CTCF ENCSR000AKB CNhs12333 132 4.41
chr1 778704 778912 MSPC_Peak_37509 8.43 cell_line GM12878 HES2 ENCSR000AKB CNhs12331 132 0.0
chr1 778704 778912 MSPC_Peak_37509 8.43 cell_line GM12878 CTCF ENCSR000AKB CNhs12332 132 0.78
chr1 869773 870132 MSPC_Peak_37508 74.0 cell_line GM12878 RERE ENCSR000AKB CNhs12333 132 0.0
...
...
到目前为止我的代码:
def read_file(file):
with open(file) as f:
current = []
for line in f: # read rest of lines
current.append([x for x in line.split()])
return(current)
inputfile = "/home/lside/Desktop/database_files/Cell_line_final2.bed" # 2.7GB text file
outpufile = "/home/lside/Desktop/database_files/Cell_line_final3.bed"
file_in = read_file("/home/lside/Desktop/tf_TPM2.csv") # 22.5MB text file
new_line = ""
with open(inputfile, 'r') as infile:
with open(outpufile, 'w') as outfile:
for line in infile:
line = line.split("t")
for j in file_in:
if j[0] == line[9] and j[3] == line[7]:
new_line = new_line + '{0}t{1}t{2}t{3}t{4}t{5}t{6}t{7}t{8}t{9}t{10}t{11}n'.format(line[0], line[1], line[2],line[3], line[4], line[5],line[6], line[7], line[8], line[9], line[10].rstrip(), j[2])
continue
outfile.write(new_line)
我同意评论,说这不应该需要30天的时间才能运行,以便瓶颈应该在其他地方。可能最大的罪犯是您正在构建的巨大字符串,而不仅仅是将每条线倾倒在每次迭代(^(。
。注意
(^(最大的犯罪者更有可能是内部循环中的continue
语句,因为这始终会迫使代码将当前行与查找文件中的所有元素进行比较,而不是停止在第一场比赛中。用 break
替换应该是要走的方法。
在这里我要做的事情,看看它的速度有多快:
def read_file(filename):
with open(filename) as f:
current = []
for line in f: # read rest of lines
e0, e2, e3 = line.split()[0], line.split()[2], line.split()[3]
current.append((e0, e2, e3)) # you only use these three elements
return current
inputfile = "/home/lside/Desktop/database_files/Cell_line_final2.bed" # 2.7GB text file
outpufile = "/home/lside/Desktop/database_files/Cell_line_final3.bed"
file_in = read_file("/home/lside/Desktop/tf_TPM2.csv") # 22.5MB text file
with open(inputfile, 'r') as infile:
with open(outpufile, 'w') as outfile:
for line in infile:
line = line.split("t")
for e0, e2, e3 in file_in:
if e0 == line[9] and e3 == line[7]:
new_line = '{0}t{1}n'.format(line.rstrip(), e2) # just append the column to the entire line
outfile.write(new_line) # dump to file, don't linger around with an ever-growing string
break
查找表
如果我们想进一步走,我们可以从file_in
制作一个查找表。这个想法是,我们不必从file_in
提取的每个元素循环,而是准备一个字典,其中从j[0],j[3]
准备键的字典 - 这是您比较的字段 - 值为j[2]
。这样,查找实际上将是瞬时的,无需循环。
使用此逻辑的修改代码看起来像:
def make_lookup_table(filename):
lookup = {}
with open(filename) as f:
for line in f: # read rest of lines
e0, e2, e3 = line.split()[0], line.split()[2], line.split()[3]
lookup[(e0, e3)] = e2 # use (e0,e3) as key, and e2 as value
return lookup
inputfile = "/home/lside/Desktop/database_files/Cell_line_final2.bed" # 2.7GB text file
outpufile = "/home/lside/Desktop/database_files/Cell_line_final3.bed"
lookup = make_lookup_table("/home/lside/Desktop/tf_TPM2.csv") # 22.5MB text file
with open(inputfile, 'r') as infile:
with open(outpufile, 'w') as outfile:
for line in infile:
line = line.split("t")
value = lookup[(line[9],line[7])]
new_line = '{0}t{1}n'.format(line.rstrip(), value) # just append the column to the entire line
outfile.write(new_line) # dump to file, don't linger around with an ever-growing string
我想建议使用SQL的非常规的解决方案。拳头,创建两个将您的数据和行号存储的表。
import sqlite3
conn = sqlite3.connect(':memory:') # you may consider file if short on RAM
c = conn.cursor()
c.execute('CREATE TABLE table1 (line INT, col1, col4);')
c.execute('CREATE TABLE table2 (line INT, col8, col10);')
conn.execute()
然后,从文件中读取行,然后将行写入数据库
for index, line in enumerate(open('tf_TPM2.csv')):
tokens = line.split()
c.execute('INSERT INTO table1 VALUES (?, ?, ?);', (index, tokens[0], tokens[3])
conn.commit()
for index, lint in enumerate(open('Cell_line_final2.bed')):
tokens = line.split()
c.execute('INSERT INTO table2 VALUES (?, ?, ?);', (index, tokens[7], tokens[9])
conn.commit()
最后,发出查询,该查询检查哪个行具有匹配值并获取行号。
query = c.execute(
'SELECT table2.line, table1.line '
'FROM table1, table2 '
'WHERE table1.col1 == table2.col10 AND table1.col4 == table2.col8 '
'ORDER BY table2.line;'
)
while True:
result = query.fetchone()
if result is None: break
# print result to file
结果将包含行号,但您也可以放置并查询其他列。
这是使用set
进行查找的另一个示例:
def main():
f = Filter(TPM_fn='tf_TPM2.bed', final_fn='Cell_line_final2.bed',
save_fn='Cell_line_final3.bed')
class Filter:
def __init__(self, **kwargs):
self.args = kwargs
self.read_TPM()
with open(self.args['save_fn'], 'w') as outfile:
with open(self.args['final_fn'], 'r') as infile:
self.read_infile(infile, outfile)
def read_infile(self, infile, outfile):
for line in infile:
fields = line.split()
key = fields[9]+fields[7]
if key in self.tpm:
outfile.write(line)
return
def read_TPM(self):
fn = self.args['TPM_fn']
tpm = set()
with open(fn) as f:
for line in f:
fields = line.split()
if len(fields) != 4:
continue
key = fields[0]+fields[3]
tpm.add(key)
self.tpm = tpm
main()