我有几个进程打算在 while 循环中运行。我基本上有一些进程收集数据,在它们停止之前,我希望它们将数据保存到 csv 或 json 文件中。我现在使用的是使用超级函数来覆盖多处理中的连接方法。进程类。
class Processor(multiprocessing.Process):
def __init__(self, arguments):
multiprocessing.Process.__init__(self)
def run(self):
self.main_function()
def main_function(self):
While True:
#do things to incoming data
def function_on_join(self):
#do one last thing before the process ends
def join(self, timeout=None):
self.function_on_join()
super(Processor, self).join(timeout=timeout)
有没有更好的方法/正确的方法/更pythonic的方法来做到这一点?
我建议你看看concurrent.futures
模块。
如果您可以将您的工作描述为一组工作人员要完成的任务列表。
基于任务的多处理
当您有一系列jobs
(例如文件名列表(并且您希望并行处理它们时 - 您可以按如下方式执行此操作:
from concurrent.futures import ProcessPoolExecutor
import requests
def get_url(url):
resp = requests.get(url)
print(f'{url} - {resp.status_code}')
return url
jobs = ['http://google.com', 'http://python.org', 'http://facebook.com']
# create process pool of 3 workers
with ProcessPoolExecutor(max_workers=1) as pool:
# run in parallel each job and gather the returned values
return_values = list(pool.map(get_url, jobs))
print(return_values)
输出:
http://google.com - 200
http://python.org - 200
http://facebook.com - 200
['http://google.com', 'http://python.org', 'http://facebook.com']
不是基于任务的多处理
当您只想运行多个不消耗作业的子进程(如第一种情况(时,您可能希望使用multiprocessing.Process
.
您可以以过程方式和 OOP 方式使用它类似于threading.Thread
。
程序时尚示例(恕我直言,更pythonic(:
import os
from multiprocessing import Process
def func():
print(f'hello from: {os.getpid()}')
processes = [Process(target=func) for _ in range(4)] # creates 4 processes
for process in processes:
process.daemon = True # close the subprocess if the main program closes
process.start() # start the process
输出:
hello from: 31821
hello from: 31822
hello from: 31823
hello from: 31824
等待进程完成
如果你想等待使用Process.join()
(关于process.join()
和process.daemon
这个SO答案的更多信息(,你可以这样做:
import os
import time
from multiprocessing import Process
def func():
time.sleep(3)
print(f'hello from: {os.getpid()}')
processes = [Process(target=func) for _ in range(4)] # creates 4 processes
for process in processes:
process.start() # start the process
for process in processes:
process.join() # wait for the process to finish
print('all processes are done!')
这将输出:
hello from: 31980
hello from: 31983
hello from: 31981
hello from: 31982
all processes are done!