线程队列和多处理辅助



前言:这是我第一次尝试使用这些工具

上下文:我有一个非常大的文件正在尝试处理。 所以我试图将文件分解成更小的块。 然后将这些文件加载到队列中进行处理。

目标是加快这是一个非常缓慢的过程。

法典:

import lifetimes
import os
import pandas
import Queue
import threading
import multiprocessing
import glob
import subprocess

#move master to processing dir
os.system("cp /data/ltv-testing1.csv /data/out")
#break master csv into 1 million row chunks
subprocess.call(['bash', '/home/ddewberry/LTV_CSV_Split.sh'])
#remove master file
os.remove("/data/out/ltv-testing1.csv")
os.chdir("/data/out")

# Create List of Files
worker_data = glob.glob('split_*')
#build queue with file list
q = Queue.Queue(worker_data)
#import tools for data processing
from lifetimes.utils import summary_data_from_transaction_data
#define worker for threads
def worker(outfile = '/data/in/Worker.csv'):
    while True:
        item = q.get()
        data = pandas.read_csv(item)
        summary = summary_data_from_transaction_data(data, data[[2]], data[[1]])
        summary.to_csv(outfile%s % (item))
        q.task_done()
cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()
q.join()
#clean up
for row in worker_data:
    os.remove(row)

问题:

我没有收到任何错误消息,但它根本不起作用。 (它基本上什么都不做)

我对我做错了什么或我需要修复什么感到非常困惑。

import lifetimes
import os
import pandas
import Queue
import threading
import multiprocessing
import glob
import subprocess

#move master to processing dir
os.system("cp /data/ltv-testing1.csv /data/out")
#break master csv into 1 million row chunks
subprocess.call(['bash', '/home/ddewberry/LTV_CSV_Split.sh'])
#remove master file
os.remove("/data/out/ltv-testing1.csv")
os.chdir("/data/out")

# Create List of Files
worker_data = glob.glob('split_*')
# rename to csv
for row in worker_data:
    os.rename(row, row+'.csv')
worker_data1 = glob.glob('split_*')
#build queue with file list
q = Queue.Queue()
for files in worker_data1:
    q.put(files)

相关内容

  • 没有找到相关文章

最新更新