Python保存进程(关闭文件处理程序)



请复习这个基本的例子

def queue_add():
    #this will exit when all lines from the 
    #file are added in the Queue
    with open('get_from.txt') as f:
        for line in f:
            q.add(line.strip())
def queue_save():
    #when to trigger save_to.close()?!?!
    save_to = open('save_to.txt')
    while True:
        data = q.get() #this functions blocks if `q` is empty, so how to know
                       #when to close the file handler `save_to`??
        save_to.write(data)
def worker():
    #this is daemon process 
    while True:
        #work with file from 
        get = q.get()
        #work with data
        q_done.put('processed data')
        q.task_done()
q = Queue()
q_done = Queue()
#starts the processes here..

所以我的问题是如何知道queue_save()已经处理并保存了done_q中的所有数据并关闭了它的file_handler?

使用哨兵值如何?

import Queue
import threading
END_OF_DATA = object() # sentinel
def queue_add(q):
    with open('get_from.txt') as f:
        for line in f:
            q.put(line.strip())
def process(x):
    # dummy
    return str(len(x)) + 'n'
def worker(q_in, q_out):
    while True:
        data = q_in.get()
        if data is END_OF_DATA:
            break
        q_out.put(process(data))
        q_in.task_done()
def queue_save(q):
    save_to = open('save_to.txt', 'w')
    while True:
        data = q.get()
        if data is END_OF_DATA:
            break
        save_to.write(data)
        q.task_done()
    save_to.close()

q1 = Queue.Queue()
q2 = Queue.Queue()
n_workers = 4
t1 = threading.Thread(target=queue_add, args=(q1,))
workers = [
    threading.Thread(target=worker, args=(q1, q2,))
    for i in range(n_workers)
]
t3 = threading.Thread(target=queue_save, args=(q2,))
t1.start()
for worker in workers: worker.start()
t3.start()
t1.join()
q1.join()
for worker in workers: q1.put(END_OF_DATA)
for worker in workers: worker.join()
q2.join()
q2.put(END_OF_DATA)
t3.join()

最新更新