从一个巨大的文本文件中获取数据,以有效地替换另一个巨大文本文件中的数据(Python)



我已经编程了几个月,所以我不是专家。我有两个巨大的文本文件(omni,~20 GB,~2.5M行;dbSNP,~10 GB,~60M行)。它们具有前几行,不一定是制表符分隔的,以"#"(标题)开头,其余行组织在制表符分隔的列中(实际数据)。

每行的前两列包含染色体编号和在染色体上的位置,而第三列包含识别码。在"omni"文件中,我没有 ID,所以我需要在 dbSNP 文件(数据库)中找到位置,并创建第一个使用 ID 完成的文件的副本。

由于内存限制,我决定逐行读取这两个文件,然后从最后一行读取重新开始。我对我的代码的效率不满意,因为我觉得它比它可能的速度慢。我很确定这是我的错,因为缺乏经验。有没有办法使用 Python 让它更快?问题可能是文件的打开和关闭吗?

我通常在 GNOME 终端(Python 2.7.6,Ubuntu 14.04)中启动脚本,如下所示:

python -u Replace_ID.py> Replace.log 2> Replace.err

提前非常感谢你。

omni (Omni 示例):


#CHROM POS ID REF ALT ...
1 534247 . 嘟嘟...
...

dbSNP (dbSNP 示例):


#CHROM POS ID REF ALT ...
1 10019 rs376643643 TA T ...
...

输出应与 Omni 文件完全相同,但在位置后带有 rs ID。

法典:

SNPline = 0    #line in dbSNP file
SNPline2 = 0    #temporary copy
omniline = 0    #line in omni file
line_offset = []    #beginnings of every line in dbSNP file (stackoverflow.com/a/620492)
offset = 0
with open("dbSNP_build_141.vcf") as dbSNP: #database
    for line in dbSNP:
        line_offset.append(offset)
        offset += len(line)
    dbSNP.seek(0)
with open("Omni_replaced.vcf", "w") as outfile:     
    outfile.write("")       
with open("Omni25_genotypes_2141_samples.b37.v2.vcf") as omni:  
    for line in omni:           
        omniline += 1
        print str(omniline) #log
        if line[0] == "#":      #if line is header
            with open("Omni_replaced.vcf", "a") as outfile:
                outfile.write(line) #write as it is
        else:
            split_omni = line.split('t') #tab-delimited columns
            with open("dbSNP_build_141.vcf") as dbSNP:
                SNPline2 = SNPline          #restart from last line found
                dbSNP.seek(line_offset[SNPline])    
                for line in dbSNP:
                    SNPline2 = SNPline2 + 1 
                    split_dbSNP = line.split('t')  
                    if line[0] == "#":
                        print str(omniline) + "#" + str(SNPline2) #keep track of what's happening.
                        rs_found = 0    #it does not contain the rs ID
                    else:
                        if split_omni[0] + split_omni[1] == split_dbSNP[0] + split_dbSNP[1]:    #if chromosome and position match
                            print str(omniline) + "." + str(SNPline2) #log
                            SNPline = SNPline2 - 1
                            with open("Omni_replaced.vcf", "a") as outfile:
                                split_omni[2] = split_dbSNP[2]  #replace the ID
                                outfile.write("t".join(split_omni)) 
                            rs_found = 1    #ID found
                            break        
                        else:
                            rs_found = 0    #ID not found
                if rs_found == 0:   #if ID was not found in dbSNP, then:
                    with open("Omni_replaced.vcf", "a") as outfile:
                        outfile.write("t".join(split_omni)) #keep the line unedited
                else:   #if ID was found:
                    pass    #no need to do anything, line already written
    print "End."

这是我对你的问题的贡献。首先,这是我对您的问题的了解,只是为了检查我是否正确:您有两个文件,每个文件都是制表分隔值文件。第一个是dbSNP,包含数据,其第三列是对应于基因染色体数(第1列)和基因在染色体上的位置(第2列)的标识符。

因此,任务包括获取 omni 文件并使用来自 dbNSP 文件的所有值(基于染色体编号和基因位置)填写 ID 列。

问题来自文件的大小。您尝试保留在每行的文件位置以进行查找并直接转到良好的行,以避免 tu 将所有 dbnsp 文件放入内存中。由于由于打开多个文件,这种方法对您来说不够快,因此这是我的建议。

解析一次 dbNSP 文件以仅保留基本信息,即耦合(number,position):ID。从您的示例中对应于:

1   534247  rs201475892
1   569624  rs6594035
1   689186  rs374789455

就内存而言,这对应于文件大小的不到 10%,因此从 20GB 文件开始,您将加载小于 2GB 的内存,这可能是负担得起的(不知道您之前尝试过哪种加载)。

所以这是我的代码来做到这一点。不要犹豫,要求解释,因为与您不同,我使用对象编程。

import argparse
#description of this script
__description__ = "This script parse a Database file in order to find the genes identifiers and provide them to a other file.[description to correct]nTake the IDs from databaseFile and output the targetFile content enriched with IDs"
# -*- -*-  -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- 
#classes used to handle and operate on data
class ChromosomeIdentifierIndex():
    def __init__(self):
        self.chromosomes = {}
    def register(self, chromosomeNumber, positionOnChromosome, identifier):
        if not chromosomeNumber in self.chromosomes:
            self.chromosomes[chromosomeNumber] = {}
        self.chromosomes[chromosomeNumber][positionOnChromosome] = identifier
    def __setitem__(self, ref, ID):
        """ Allows to use alternative syntax to chrsIndex.register(number, position, id) : chrsIndex[number, position] = id """
        chromosomeNumber, positionOnChromosome = ref[0],ref[1]
        self.register(chromosomeNumber, positionOnChromosome, ID)
    def __getitem__(self, ref):
        """ Allows to get IDs using the syntax: chromosomeIDindex[chromosomenumber,positionOnChromosome] """
        chromosomeNumber, positionOnChromosome = ref[0],ref[1]
        try:
            return self.chromosomes[chromosomeNumber][positionOnChromosome]
        except:
            return "."
    def __repr__(self):
        for chrs in self.chromosomes.keys():
            print "Chromosome : ", chrs
            for position in self.chromosomes[chrs].keys():
                print "t", position, "t", self.chromosomes[chrs][position]
class Chromosome():
    def __init__(self, string):
        self.values   = string.split("t")
        self.chrs     = self.values[0]
        self.position = self.values[1]
        self.ID       = self.values[2]
    def __str__(self):
        return "t".join(self.values)
    def setID(self, ID):
        self.ID = ID
        self.values[2] = ID
class DefaultWritter():
    """ Use to print if no output is specified """
    def __init__(self):
        pass
    def write(self, string):
        print string
    def close(self):
        pass
# -*- -*-  -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- -*- 
#The code executed when the scrip is called
if __name__ == "__main__":
    #initialisation
    parser = argparse.ArgumentParser(description = __description__)
    parser.add_argument("databaseFile"  , help="A batch file that contains many informations, including the IDs.")
    parser.add_argument("targetFile"    , help="A file that contains informations, but miss the IDs.")
    parser.add_argument("-o", "--output", help="The output file of the script. If no output is specified, the output will be printed on the screen.")
    parser.add_argument("-l", "--logs"  , help="The log file of the script. If no log file is specified, the logs will be printed on the screen.")
    args = parser.parse_args()
    output = None
    if args.output == None:
        output = DefaultWritter()
    else:
        output = open(args.output, 'w')
    logger = None
    if args.logs == None:
        logger = DefaultWritter()
    else:
        logger = open(args.logs, 'w')
    #start of the process
    idIndex = ChromosomeIdentifierIndex()
    #build index by reading the database file.
    with open(args.databaseFile, 'r') as database:
        for line in database:
            if not line.startswith("#"):
                chromosome = Chromosome(line)
                idIndex[chromosome.chrs, chromosome.position] = chromosome.ID
    #read the target, replace the ID and output the result
    with open(args.targetFile, 'r') as target:
        for line in target:
            if not line.startswith("#"):
                chromosome = Chromosome(line)
                chromosome.setID(idIndex[chromosome.chrs, chromosome.position])
                output.write(str(chromosome))
            else:
                output.write(line)
    output.close()
    logger.close()          

主要思想是解析一次 dbNSP 文件并收集字典中的所有 ID。然后逐行读取全文件并输出结果。

你可以像这样调用脚本:

python replace.py ./dbSNP_example.vcf ./Omni_example.vcf -o output.vcf

我使用和导入来处理参数的 argparse 模块也提供自动帮助,因此参数描述可用于

python replace.py -h

python replace.py --help

我相信他的方法会比你更快,因为我读过一次文件,稍后再在 RAM 上工作,我邀请你测试一下。

注意:我不知道你是否熟悉对象编程,所以我不得不提到,这里所有的类都在同一个文件中,以便发布在堆栈溢出上。在现实生活中的用例中,好的做法是将所有类放在单独的文件中,例如"Chromosome.py"、"ChromosomeIdentifierIndex.py"和"DefaultWritter.py",然后将它们导入到"replace.py"文件中。

最新更新