如何并行化或制作更快的python脚本



我有一个文本文件操纵的代码。尽管文本文件很大,并且我当前的代码计算时需要30天才能完成。

如果多处理是他唯一的方法,我拥有一个带有40个内核的服务器。

cell_line_final2.bed:

chr1    778704  778912  MSPC_Peak_37509  8.43   cell_line   GM12878  CTCF   ENCSR000AKB CNhs12333   132
chr1    778704  778912  MSPC_Peak_37509  8.43   cell_line   GM12878  CTCF   ENCSR000AKB CNhs12331   132
chr1    778704  778912  MSPC_Peak_37509  8.43   cell_line   GM12878  CTCF   ENCSR000AKB CNhs12332   132
chr1    869773  870132  MSPC_Peak_37508  74.0   cell_line   GM12878  CTCF   ENCSR000AKB CNhs12333   132
...
...

tf_tpm2.bed:

CNhs12333   2228319     4.41    CTCF
CNhs12331   6419919     0.0     HES2
CNhs12332   6579994     0.78    ZBTB48
CNhs12333   8817465     0.0     RERE
...
...

所需的输出是在" cell_line_final2.bed"中添加一列,其中" tf_tpm2.bed"的第一个和第四列匹配" cell_line_final2.bed"的第10和8列。

chr1    778704  778912  MSPC_Peak_37509  8.43   cell_line   GM12878  CTCF   ENCSR000AKB CNhs12333   132   4.41
chr1    778704  778912  MSPC_Peak_37509  8.43   cell_line   GM12878  HES2   ENCSR000AKB CNhs12331   132   0.0
chr1    778704  778912  MSPC_Peak_37509  8.43   cell_line   GM12878  CTCF   ENCSR000AKB CNhs12332   132   0.78
chr1    869773  870132  MSPC_Peak_37508  74.0   cell_line   GM12878  RERE   ENCSR000AKB CNhs12333   132   0.0
...
...

到目前为止我的代码:

def read_file(file):
    with open(file) as f:
        current = []
        for line in f: # read rest of lines
            current.append([x for x in line.split()])
    return(current)

inputfile = "/home/lside/Desktop/database_files/Cell_line_final2.bed" # 2.7GB text file
outpufile = "/home/lside/Desktop/database_files/Cell_line_final3.bed"
file_in = read_file("/home/lside/Desktop/tf_TPM2.csv") # 22.5MB text file
new_line = ""
with open(inputfile, 'r') as infile:
    with open(outpufile, 'w') as outfile:
        for line in infile:
            line = line.split("t")
            for j in file_in:
                if j[0] == line[9] and j[3] == line[7]:
                    new_line = new_line + '{0}t{1}t{2}t{3}t{4}t{5}t{6}t{7}t{8}t{9}t{10}t{11}n'.format(line[0], line[1], line[2],line[3], line[4], line[5],line[6], line[7], line[8], line[9], line[10].rstrip(), j[2])
                    continue
        outfile.write(new_line)

我同意评论,说这不应该需要30天的时间才能运行,以便瓶颈应该在其他地方。可能最大的罪犯是您正在构建的巨大字符串,而不仅仅是将每条线倾倒在每次迭代(^(。

注意

(^(最大的犯罪者更有可能是内部循环中的continue语句,因为这始终会迫使代码将当前行与查找文件中的所有元素进行比较,而不是停止在第一场比赛中。用 break替换应该是要走的方法。

在这里我要做的事情,看看它的速度有多快:

def read_file(filename):
    with open(filename) as f:
        current = []
        for line in f: # read rest of lines
            e0, e2, e3 = line.split()[0], line.split()[2], line.split()[3]
            current.append((e0, e2, e3))  # you only use these three elements
    return current

inputfile = "/home/lside/Desktop/database_files/Cell_line_final2.bed" # 2.7GB text file
outpufile = "/home/lside/Desktop/database_files/Cell_line_final3.bed"
file_in = read_file("/home/lside/Desktop/tf_TPM2.csv") # 22.5MB text file
with open(inputfile, 'r') as infile:
    with open(outpufile, 'w') as outfile:
        for line in infile:
            line = line.split("t")
            for e0, e2, e3 in file_in:
                if e0 == line[9] and e3 == line[7]:
                    new_line = '{0}t{1}n'.format(line.rstrip(), e2)  # just append the column to the entire line
                    outfile.write(new_line)  # dump to file, don't linger around with an ever-growing string
                    break

查找表

如果我们想进一步走,我们可以从file_in制作一个查找表。这个想法是,我们不必从file_in提取的每个元素循环,而是准备一个字典,其中从j[0],j[3]准备键的字典 - 这是您比较的字段 - 值为j[2]。这样,查找实际上将是瞬时的,无需循环。

使用此逻辑的修改代码看起来像:

def make_lookup_table(filename):
    lookup = {}
    with open(filename) as f:
        for line in f: # read rest of lines
            e0, e2, e3 = line.split()[0], line.split()[2], line.split()[3]
            lookup[(e0, e3)] = e2  # use (e0,e3) as key, and e2 as value
    return lookup

inputfile = "/home/lside/Desktop/database_files/Cell_line_final2.bed" # 2.7GB text file
outpufile = "/home/lside/Desktop/database_files/Cell_line_final3.bed"
lookup = make_lookup_table("/home/lside/Desktop/tf_TPM2.csv") # 22.5MB text file
with open(inputfile, 'r') as infile:
    with open(outpufile, 'w') as outfile:
        for line in infile:
            line = line.split("t")
            value = lookup[(line[9],line[7])]
            new_line = '{0}t{1}n'.format(line.rstrip(), value)  # just append the column to the entire line
            outfile.write(new_line)  # dump to file, don't linger around with an ever-growing string

我想建议使用SQL的非常规的解决方案。拳头,创建两个将您的数据和行号存储的表。

import sqlite3
conn = sqlite3.connect(':memory:')  # you may consider file if short on RAM
c = conn.cursor()
c.execute('CREATE TABLE table1 (line INT, col1, col4);')
c.execute('CREATE TABLE table2 (line INT, col8, col10);')
conn.execute()

然后,从文件中读取行,然后将行写入数据库

for index, line in enumerate(open('tf_TPM2.csv')):
    tokens = line.split()
    c.execute('INSERT INTO table1 VALUES (?, ?, ?);', (index, tokens[0], tokens[3])
conn.commit()
for index, lint in enumerate(open('Cell_line_final2.bed')):
    tokens = line.split()
    c.execute('INSERT INTO table2 VALUES (?, ?, ?);', (index, tokens[7], tokens[9])
conn.commit()

最后,发出查询,该查询检查哪个行具有匹配值并获取行号。

query = c.execute(
    'SELECT table2.line, table1.line '
    'FROM table1, table2 '
    'WHERE table1.col1 == table2.col10 AND table1.col4 == table2.col8 '
    'ORDER BY table2.line;'
)
while True:
    result = query.fetchone()
    if result is None: break
    # print result to file

结果将包含行号,但您也可以放置并查询其他列。

这是使用set进行查找的另一个示例:

def main():
    f = Filter(TPM_fn='tf_TPM2.bed', final_fn='Cell_line_final2.bed',
               save_fn='Cell_line_final3.bed')
class Filter:
    def __init__(self, **kwargs):
        self.args = kwargs
        self.read_TPM()
        with open(self.args['save_fn'], 'w') as outfile:
            with open(self.args['final_fn'], 'r') as infile:
                self.read_infile(infile, outfile)

    def read_infile(self, infile, outfile):
        for line in infile:
            fields = line.split()
            key = fields[9]+fields[7]
            if key in self.tpm:
                outfile.write(line)
        return 

    def read_TPM(self):
        fn = self.args['TPM_fn']
        tpm = set()
        with open(fn) as f:
            for line in f:
                fields = line.split()
                if len(fields) != 4:
                    continue 
                key = fields[0]+fields[3]
                tpm.add(key)
        self.tpm = tpm
main()

相关内容

  • 没有找到相关文章

最新更新