如何通过基于特定列的值将大型文件进行多线程,多线程



我已经为生物过程写了一个Python程序https://codereview.stackexchange.com/questions/questions/186396/solve-phase-phase-phase-phase-phase-state-betwo----------- - 使用马尔科夫 - 转变proba。

如果您研究该程序,您可以看到该程序一次从两条连续的行(或键,阀(计算数据中需要大量时间。我不是将整个代码放在这里,但是为简单起见,我正在创建模拟文件和模拟程序(下面给出(,该程序在最简单的层面上的行为相似。在此模拟程序中,我正在计算len(vals)列,然后将其写回输出文件。

由于计算在原始程序中执行for (k1, v1) and (k2, v2) ....时(链接上(我想通过 - 1((>以最快的方式读取整体数据 2(将数据分为块,将数据划分为唯一的chr字段 3(进行计算 4(它回到文件。那么,我该怎么做?

在给定的模拟文件中,计算太简单了,无法被gpu/cpu绑定,但是我只想知道如果需要的话,我该怎么做。

注意:我有太多的人问我要实现什么 - 我正在尝试多进程/线程给定的问题。如果我将原始的整个大型程序放在这里,没有人会看它。因此,让我们锻炼这个小文件和小python程序。

以下是我的代码和数据:

my_data = '''chrtpostidxtvals
2t23t4tabcd
2t25t7tatg
2t29t8tct
2t35t1txylfz
3t37t2tmnost
3t39t3tpqr
3t41t6trtuv
3t45t5tlfghef
3t39t3tpqr
3t41t6trtu
3t45t5tlfggg
4t25t3tpqrp
4t32t6trtu
4t38t5tlfgh
4t51t3tpqr
4t57t6trtus
'''

def manipulate_lines(vals):
    vals_len = len(vals[3])
    return write_to_file(vals[0:3], vals_len)
def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('t'.join(['t'.join(a), str(b), 'n']))
    to_file.close()
def main():
    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('t'.join(['chr', 'pos', 'idx', 'vals', 'n']))
    to_file.close()
    data = my_data.rstrip('n').split('n')

    for lines in data:
        if lines.startswith('chr'):
            continue
        else:
            lines = lines.split('t')
        manipulate_lines(lines)

if __name__ == '__main__':
    main()

使用多个进程处理数据时要处理的问题是保留顺序。Python提出了一种使用multiprocessing.Pool来处理此操作的相当不错的方法,该方法可用于map输入数据上的过程。然后,这将照顾按顺序返回结果。

但是,处理可能仍然过失,因此要正确使用它,仅处理,并且在子过程中不应运行IO访问。因此,要在您的情况下使用它,需要执行一小段重写您的代码,而这些IO操作都会在主要过程中进行:

from multiprocessing import Pool
from time import sleep
from random import randint
my_data = '''chrtpostidxtvals
2t23t4tabcd
2t25t7tatg
2t29t8tct
2t35t1txylfz
3t37t2tmnost
3t39t3tpqr
3t41t6trtuv
3t45t5tlfghef
3t39t3tpqr
3t41t6trtu
3t45t5tlfggg
4t25t3tpqrp
4t32t6trtu
4t38t5tlfgh
4t51t3tpqr
4t57t6trtus
'''
def manipulate_lines(vals):
    sleep(randint(0, 2))
    vals_len = len(vals[3])
    return vals[0:3], vals_len
def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('t'.join(['t'.join(a), str(b), 'n']))
    to_file.close()
def line_generator(data):
    for line in data:
        if line.startswith('chr'):
            continue
        else:
           yield line.split('t')
def main():
    p = Pool(5)
    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('t'.join(['chr', 'pos', 'idx', 'vals', 'n']))
    to_file.close()
    data = my_data.rstrip('n').split('n')
    lines = line_generator(data)
    results = p.map(manipulate_lines, lines)
    for result in results:
        write_to_file(*result)
if __name__ == '__main__':
    main()

此程序在其不同的chr值之后不会拆分列表,而是通过条目处理条目,直接从最大5(参数为 Pool(子过程中的列表进行处理。

显示数据仍处于预期顺序中,我在manipulate_lines函数中添加了随机的睡眠延迟。这显示了该概念,但可能无法正确看出加速,因为睡眠过程允许另一个人并行运行,而计算较重的过程将在其所有运行时间内使用CPU。

可以看出,一旦map呼叫返回,就必须完成文件的写作,这确保所有子过程均已终止并返回结果。场景后面的这种通信有很多开销,因此要使这是有益的,计算部分必须大大比写阶段更长,并且它不能生成 too 要写入的大量数据以写入文件。

此外,我还分解了发电机中的for环路。因此,可以根据要求获得到multiprocessing.Pool的输入。另一种方法是预处理data列表,然后将该列表直接传递给Pool。但是,我发现发电机解决方案会更好,并且具有较小的峰值内存消耗。

另外,对多线程与多处理的评论;只要您进行计算繁重的操作,就应该使用多处理,至少在理论上,该过程允许这些过程在不同的机器上运行。此外,在CPYTHON中 - 最常用的Python实现 - 键入另一个问题,即全局解释器锁(GIL(。这意味着只有一个线程可以一次执行,因为解释器会阻止所有其他线程的访问。(有一些例外,例如,在使用C中写入的模块时,例如Numpy。在这些情况下,可以在进行Numpy计算时释放GIL,但通常情况并非如此。(因此,线程主要用于您程序的情况。被卡住了,等待缓慢的,排序,io。(插座,终端输入等(

我只使用了几次线程,但我没有在下面测试过此代码,但是从快速浏览中,for循环确实是唯一可以从线程中受益的地方。

我会让其他人决定。

import threading
my_data = '''chrtpostidxtvals
2t23t4tabcd
2t25t7tatg
2t29t8tct
2t35t1txylfz
3t37t2tmnost
3t39t3tpqr
3t41t6trtuv
3t45t5tlfghef
3t39t3tpqr
3t41t6trtu
3t45t5tlfggg
4t25t3tpqrp
4t32t6trtu
4t38t5tlfgh
4t51t3tpqr
4t57t6trtus
'''

def manipulate_lines(vals):
    vals_len = len(vals[3])
    return write_to_file(vals[0:3], vals_len)
def write_to_file(a, b):
    print(a,b)
    to_file = open('write_multiprocessData.txt', 'a')
    to_file.write('t'.join(['t'.join(a), str(b), 'n']))
    to_file.close()
def main():
    to_file = open('write_multiprocessData.txt', 'w')
    to_file.write('t'.join(['chr', 'pos', 'idx', 'vals', 'n']))
    to_file.close()
    data = my_data.rstrip('n').split('n')
    for lines in data:
        if not lines.startswith('chr'):
            lines = lines.split('t')
        threading.Thread(target = manipulate_lines, args = (lines)).start()

if __name__ == '__main__':
    main()

相关内容

  • 没有找到相关文章

最新更新