我正在处理一项需要在python中进行多处理的任务,我需要通过将已处理的文档ID写入文件(进程之间共享的单个文件(来跟踪状态。
我使用下面的代码片段实现了一个简单的版本。在代码中,我将一些Id存储在一个名为question
的变量中,即共享文件f
。在主方法中,我会将question
拆分为可以并行处理的可能块。
这样做对吗?
from multiprocessing import Pool
from multiprocessing import Queue
def reader(val):
pqueue.put(val)
def writer():
a = pqueue.get()
f = open("num.txt",'a')
f.write(str(a))
f.write("n")
f.close()
def main():
global question
global pqueue
pqueue = Queue() # writer() writes to pqueue from _this_ process
processes = []
question = [16,0,1,2,3,4,5,6,7,8,10,11,12,13,14,15]
cores=5
loww=0
chunksize = int((len(question)-loww)/cores)
splits = []
for i in range(cores):
splits.append(loww+1+((i)*chunksize))
splits.append(len(question)+1)
print(splits)
args = []
for i in range(cores):
a=[]
arguments = (i, splits[i], splits[i+1])
a.append(arguments)
args.append(a)
print(args)
p = Pool(cores)
p.map(call_process, args)
p.close()
p.join
def call_process(args):
lower=args[0][1]
upper=args[0][2]
for x in range(lower,upper):
a = question[x-1]
try:
pass
except:
continue
#write item to file
print(f,'a = ',a)
reader(a)
writer()
main()
注意:代码似乎不起作用。
迟早会有一个进程试图打开一个文件,而另一个进程正在写入该文件,事情就会破裂。
相反,我的策略是:
-
启动一个进程,称之为";编年史家";,其监视队列中的传入比特&碎片,每次有东西进来,都要写到文件中。
-
启动工人。每次一个工人完成,推一些位&分段到前面提到的队列中。然后继续下一个任务(从而将所有文件打开、写入和关闭过程交给"记录者"(
-
让他们所有人监视一个名为"事件"的事件;stop_and_drop_dead";。主进程可以设置((此事件,而子进程在看到事件设置后,会优雅地结束自己。