我想在终止进程之前保存所有数据。我正在使用基于窗口的机器。如果不终止进程,它工作得很好。我尝试使用signal
库,但它仅适用于UNIX机器。这是因为这个。所以基本上问题是在Windows中拦截我不知道如何制作的信号。使用库atexit
也没有帮助。我什至尝试使该方法save_stuff
,但它也没有帮助。有人知道如何制作它吗?
主要任务是在一段时间后停止程序执行,并收集进程外部的所有可用数据。
from multiprocessing import Queue, Process
class Worker:
def __init__(self):
self.workers = 1
def work(self, n):
for i in range(n):
self.workers = i
print(i)
def __str__(self):
return str(self.workers)
class MyProcess(Process):
def __init__(self, n):
self.worker = Worker()
self.shared_obj = Queue()
self.shared_obj.put(self.worker)
self.args = n
super().__init__()
def run(self):
self.worker.work(self.args)
self.shared_obj.get(self.worker)
self.shared_obj.put(self.worker)
def save_stuff(self):
self.shared_obj.get(self.worker)
self.shared_obj.put(self.worker)
print('collect data')
if __name__ == '__main__':
p = MyProcess(1000000)
p.start()
p.join(1)
if p.is_alive():
p.save_stuff()
p.terminate()
print('killed worker')
print('shared object ' + str(p.shared_obj.get()))
else:
print('he was in time this worker')
print('shared object ' + str(p.shared_obj.get()))
在您的代码中self.worker.work(self.args)
run
阻塞,直到它 完成整个循环。如果您只是终止进程,则发送部件 将任何内容返回给父级将永远不会运行。
相反,我们需要一种方法让进程优雅地完成,以便它可以发回 父对象。worker.run
不允许为此阻止,所以 我下面的代码将其包装在一个额外的线程中。子进程中的主线程 启动此线程并运行 while 循环,检查是否通过 管道并检查工作线程是否处于活动状态。此循环将中断,如果您的 工人自然完成或父母送"毒丸"。当这个 发生保存和发送可能发生,父级可以.get()
实例。
import time
import logging
from threading import Thread
from multiprocessing import Process, Pipe
def init_logging(level=logging.DEBUG):
fmt = '[%(asctime)s %(levelname)-8s %(processName)s'
' %(funcName)s()] --- %(message)s'
logging.basicConfig(format=fmt, level=level)
class Worker:
def __init__(self, n):
self.n = n
def run(self):
for i in range(int(self.n)):
self.n -= 1
return self
def __str__(self):
return f'{self.n}'
def __repr__(self):
return f'Worker(n={self.n})'
class MyProcess(Process):
def __init__(self, n, log_level=logging.DEBUG):
super().__init__()
self.args = n
self.log_level = log_level
self.worker = None
self.worker_thread = None
self.parent_conn, self.child_conn = Pipe()
logging.getLogger().debug('process instantiated')
def run(self):
init_logging(self.log_level)
logging.getLogger().debug('process started')
self.worker = Worker(self.args)
self.worker_thread = Thread(target=self.worker.run)
self.worker_thread.daemon = True
self.worker_thread.start()
while not self.child_conn.poll() and self.worker_thread.is_alive():
self.worker_thread.join(0.5) # heartbeat for checking
self._save()
def _save(self):
"""Send worker instance to parent."""
logging.getLogger().debug('sending instance to parent')
self.child_conn.send(self.worker)
self.child_conn.close()
def close(self):
"""Close process and receive result."""
logging.getLogger().debug('closing process')
# The actual value we are sending to child does not matter because
# the while loop in `run` will break upon receipt of any object.
self.parent_conn.send('POISON')
def get(self):
"""Get result from child."""
logging.getLogger().debug('get result from child')
self.worker = self.parent_conn.recv()
return self.worker
我在 Linux 下对此进行了测试,但start_method设置为"spawn",这是 Windows 上的默认设置,所以我希望它能运行。
if __name__ == '__main__':
init_logging()
logger = logging.getLogger()
p = MyProcess(100e6) # try 10 vs 100e6 to toggle behaviour
p.start()
p.join(2)
if p.is_alive():
p.close()
p.get()
logger.info('killed worker')
time.sleep(0.1) # just to keep stdout in order
print('shared object ' + repr(p.worker))
else:
p.get()
logger.info('worker was in time')
time.sleep(0.1) # just to keep stdout in order
print('shared object ' + repr(p.worker))
assert isinstance(p.worker, Worker)
p.join()
示例输出:
[2018-09-08 05:27:46,316 DEBUG MainProcess __init__()] --- process instantiated
[2018-09-08 05:27:46,370 DEBUG MyProcess-1 run()] --- process started
[2018-09-08 05:27:48,321 DEBUG MainProcess close()] --- closing process
[2018-09-08 05:27:48,322 DEBUG MainProcess get()] --- get result from child
[2018-09-08 05:27:48,396 DEBUG MyProcess-1 _save()] --- sending instance to parent
[2018-09-08 05:27:48,402 INFO MainProcess <module>()] --- killed worker
shared object Worker(n=82683682.0)
Process finished with exit code 0
注意worker.n
在.close()
呼叫前 2 秒内从 100M 倒计时到 82.68M。
从父进程调用 p.save_stuff(( 将不起作用。这两个进程在不同的地址空间中运行,父进程不会在子进程中更新已修改属性的副本。由于您正在从子级更新数据并从父级读取数据,因此使用共享内存是安全的,该内存应在循环的每次迭代中更新。以下代码应该适用于您在此处尝试实现的目标。
from multiprocessing import Value, Process
class Worker:
def __init__(self):
self.workers = 1
def work(self, n, shm):
for i in range(n):
self.workers = i
shm.value = self.workers
print(i)
def __str__(self):
return str(self.workers)
class MyProcess(Process):
def __init__(self, n, shm):
self.worker = Worker()
self.args = n
self.shm = shm
super().__init__()
def run(self):
self.worker.work(self.args, self.shm)
if __name__ == '__main__':
shm = Value("i", 0)
p = MyProcess(1000000, shm)
p.start()
p.join(1)
if p.is_alive():
p.terminate()
print('killed worker')
print('shared object ' + str(shm.value))
else:
print('he was in time this worker')
print('shared object ' + str(shm.value))