Python(实例化)锁定多个输出



我正在开发Python多线程应用程序。

场景是:

源数据(每小时数千个小文件)包含许多主题的数据(范围1-100)。每一行都以"subject1|col1|col2|…|coln|"开头。目前,用户只对10个(示例)主题感兴趣。但在未来,他们可以根据自己的喜好添加(或删除)更多的主题。

为此,我写了一个程序,创建了一个队列,并将目录中的所有源文件添加到其中。可配置的线程数启动并从队列中获取一个文件,并开始分析数据。我想为每个主题生成一个大文件。因此,由于用户当前想要解析10个主题,我将生成10个输出文件。

由于可以有16-32个线程解析源数据并写入10个输出文件,我不想创建10个锁,每个输出文件一个锁(每个输出文件硬编码subject_lock),因为我想为程序动态开始解析新主题提供灵活性。

我想不出任何方法来动态控制线程锁定到目标输出文件(除了老派的创建空文件触摸文件(带有subject_timestamp用于调试)的方法,该文件是基于主题列表生成的,每隔几秒钟从配置文件中读取一次)。

欢迎提出任何建议或建议。

谢谢,Venkat

听起来可能会有多个线程向单个输出文件写入,因此您希望使写入线程安全,同时如果添加了主题,则允许创建另一个输出文件并将其写入。

我建议让每个线程在写入过程中简单地锁定输出文件。这不会阻止创建另一个输出文件;您只需确保该文件在写入过程中也被锁定即可。然后,无论您有16个或32个线程或介于两者之间的线程,您有多少输出文件都无关紧要——只需确保它们在写入过程中被锁定,因此任何试图在那里写入的其他线程都需要等待。

我会在多个线程上使用多个进程,并且只使用一个进程来写入一个文件。看看这个:

假设您知道将包含所有文件的目录,并且在任何给定时间,所有文件都需要重新处理。此解决方案将为每个主题创建一个新文件

import multiprocessing as mp
import csv
import os
def processFiles(dirpath, outputDirectory, numProcs):
# create processes and queues
fpaths, data, lines, dq = [mp.Queue() for _ in range(4)]
readers = [mp.Process(target=fileProcessor, args=(fpaths, data)) for _ in range(numProcs)]
writer = mp.Process(target=fileWriter, args=(lines, dq))
for r in readers: r.start()
writer.start()
# let the filereaders read all the files
for fpath in glob.glob(os.path.join(dirpath, "*.txt")):
fpaths.put(fpath)
for _ in range(numProcs): fpaths.put(None)

files = {}  # track each subject's output file
done = 0
while done < numProcs:
t = data.get()  # get data from the filereaders
if t is None:  # one of the filereaders died
done += 1
continue
sub, data = t
if sub not in files:
f = open(os.path.join(outputDirectory, "{}.csv".format(sub))
w = csv.writer(f, delimiter="|")
files[sub] = (f,w)
lines.put((files[sub][1], data))  # tell the writer to write out the line into that subject's output file
lines.put(None)
for _sub, (f, _w) in files.items(): f.close()  # close the open files
for r in readers: r.terminate()  # kill all the readers
dq.get()  # wait for the writer to finish
writer.terminate()  # kill the writer

def fileProcessor(qIn, qOut):
for infilepath in iter(qIn.get, None):  # get the filepath of the input file
with open(infilepath) as infile:
for sub, *data in csv.reader(infile, delimiter="|"):
qOut.put((sub, data))  # send back the name of the subject, as well as the data
qOut.put(None)  # time to die

def fileWriter(qIn, qOut):
for f, data in iter(qIn.get, None):  # get the csv writer object, and the data to be written
f.writerow(data)
qOut.put(None)  # all data written. Time to die

最新更新