我正在尝试打印此列表中的 id,它们在进程的开始和结束之间有延迟,并且在 queue.get 之间有延迟(我使用线程实现。带共享锁的计时器(。我遇到的问题是,虽然我当前设置的计时器允许我锁定进程,以便在一个进程从队列中获取所有其他进程无法启动的记录后有 2 秒的时间段,但我的程序只关闭 2 个进程在程序运行结束时关闭 4 个进程中的 2 个。我该如何解决此问题,以便所有进程关闭并且程序可以退出。
我在下面的输出显示了这一点,因为我希望还有 2 个"工人关闭"通知:
Process started
Process started
Process started
Process started
begin 1 : 1560891818.0307562
begin 2 : 1560891820.0343137
begin 3 : 1560891822.0381632
end 2 : 3.0021514892578125
end 1 : 6.004615068435669
begin 4 : 1560891824.0439706
begin 5 : 1560891826.0481522
end 4 : 3.004107713699341
end 3 : 6.005637168884277
begin 6 : 1560891828.0511773
begin 7 : 1560891830.0557532
end 6 : 3.0032966136932373
end 5 : 6.006829261779785
begin 8 : 1560891832.056265
begin 9 : 1560891834.0593572
end 8 : 3.011284112930298
end 7 : 6.005618333816528
begin 10 : 1560891836.0627353
end 10 : 3.0014095306396484
worker closed
end 9 : 6.000675916671753
worker closed
import multiprocessing
from time import sleep, time
import threading
class TEMP:
lock = multiprocessing.Lock()
id_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
queue = multiprocessing.Queue(10)
DELAY = 2
def mp_worker(self, queue, lock):
while queue.qsize() > 0:
lock.acquire()
# Release the lock after a delay
threading.Timer(self.DELAY,lock.release).start()
record = queue.get()
start_time = time()
print("begin {0} : {1}".format(record, start_time))
if (record % 2 == 0):
sleep(3)
else:
sleep(6)
print("end {0} : {1}".format(record, time() - start_time))
threading.Timer.join()
print("worker closed")
def mp_handler(self):
# Spawn two processes, assigning the method to be executed
# and the input arguments (the queue)
processes = [multiprocessing.Process(target=self.mp_worker, args=([self.queue, self.lock]))
for _ in range(4)]
for process in processes:
process.start()
print('Process started')
for process in processes:
process.join()
def start_mp(self):
for id in self.id_list:
self.queue.put(id)
self.mp_handler()
if __name__ == '__main__':
temp = TEMP()
temp.start_mp()
我实际上解决了这个问题。我的代码没有加入的主要原因是因为我的代码正在检查队列是否为空,等待延迟,然后尝试从队列中获取某些内容。这意味着在程序结束时,虽然队列变为空,并且 4 个进程中的 2 个同时成功完成,但其余 2 个进程处于延迟状态。当这个延迟结束时,他们试图从队列中获取一些东西,但由于队列是空的,他们只是阻止了进程代码的其余部分运行,这意味着他们永远无法重新加入。
我通过在进程尝试从队列中获取某些内容之前检查队列是否为空来解决此问题。我的固定工作线程功能如下:
def mp_worker(self, queue, lock):
while not queue.empty():
print(mp.current_process().name)
lock.acquire()
# Release the lock after a delay
timer = Timer(self.DELAY, lock.release)
timer.start()
if not queue.empty():
record = queue.get(False)
start_time = time()
print("begin {0} : {1}".format(record, start_time))
if (record % 2 == 0):
sleep(3)
else:
sleep(6)
print("end {0} : {1}".format(record, time() - start_time))
print("{0} closed".format(mp.current_process().name))