我想使用多处理并行分析多个图像,我的类是
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self, filename, path):
self.filename = filename
self.path = path
self.input_data = None
self.output_data = None
def read_image(self):
self.input_data = cv2.imread(self.path + self.filename)[1]
def write_image(self):
cv2.imwrite(self.path + self.filename.split('.')[0] + '_' + self.DISPLAY_NAME + '.png', self.output_data)
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self):
self.read_image()
self.process()
self.write_image()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
source = rgb_to_hsv(self.input_data)
self.output_data = treshold_otsu(source)
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
source = rgb_to_lab(self.input_data)
self.output_data = global_threshold(source)
segmenter_class = {
'hsv': HSV_Segmentation,
'lab': LAB_Segmenter
}.get(procedure)
if not segmenter_class:
raise ArgumentError("Invalid segmentation method '{}'".format(procedure))
for img in images:
os.chdir(img_dir)
processor = = segmenter_class(img, img_dir, procedure)
processor.start_pipeline()
然而,我不知道如何调用地图功能:
image_lst = os.listdir(my_image_path)
# We split the list into sublist with 5 elements because of 512 GB RAM limitation
if len(image_lst) > 4:
nr_of_sublists = int(len(image_lst)/2.5)
image_sub_lst =np.array_split(image_lst, nr_of_sublists)
else:
image_sub_lst = [image_lst]
# We do the analysis for each sublist
for sub_lst in image_sub_lst:
print (sub_lst)
pool = multiprocessing.Pool(8)
# Call the processor
processor = = segmenter_class(img, img_dir, procedure)
processor.start_pipeline()
# How to call map???
pool.map(?, sub_lst)
pool.terminate()
编辑:
我试图将代码更改为注释,但仍然收到一个错误:
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print ('init')
def read_image(self):
print ('read')
def write_image(self):
print ('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print ('ok starting')
filename, path = args
print(filename, path)
self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print ('ok HSV')
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print ('ok LAB')
procedure = 'hsv'
segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter
}.get(procedure)
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
if __name__ == '__main__':
pool = multiprocessing.Pool(3)
pool.map(segmenter_class.start_pipeline, [images, img_dir])
pool.terminate()
错误:上述异常是以下异常的直接原因:
追踪(最近一次通话(:文件";C:/Users/lueck/PycharmProjects/hyphe_cmd/hype_cmd/multy.py";,第50行,inpool.map(segmenter_class.start_pipeline,[images,img_dir](文件";C: \Users\lueck\AppData\Local\Continum\anaconda3\envs\hyphe_env\lib\multiprocessing\pool.py",第266行,在地图上回归自我_map_async(func,iterable,mapstar,chunksize(.get((文件";C: \Users\lueck\AppData\Local\Continum\anaconda3\envs\hyphe_env\lib\multiprocessing\pool.py",644行,在get中提升自我_价值类型错误:start_pipeline((缺少1个必需的位置参数:"args">
您必须创建具有对(filename, path)
的列表
data = [(img, img_dir) for img in images]
然后map将在分离的过程中运行每对。
但你必须在start_pipeline
中获得args
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}npath: {}'.format(filename, path))
return self.process()
您必须使用()
创建类segmenter_class
的实例才能使用start_pipeline
pool.map(segmenter_class().start_pipeline, data)
BTW:在示例代码中,我还返回进程的结果。
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
if __name__ == '__main__':
procedure = 'hsv'
segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}.get(procedure)
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data = [(img, img_dir) for img in images]
pool = multiprocessing.Pool(3)
# example 1
results = pool.map(segmenter_class().start_pipeline, data)
print('Results:', results)
# example 2
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()
编辑:
您还可以创建获得procedure
和data
的函数,然后在map
中使用它——这样每个进程都会创建自己的procedure
实例,或者您可以向不同的进程发送不同的过程。
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):
procedure = args[0]
data = args[1:]
segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)
return result
if __name__ == '__main__':
procedure = 'hsv'
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data = [(procedure, img, img_dir) for img in images]
pool = multiprocessing.Pool(3)
# example 1
results = pool.map(start_process, data)
print('Results:', results)
# example 2
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()
不同程序的示例
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
pool = multiprocessing.Pool(3)
data = [('hsv', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results HSV:', results)
data = [('lab', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results LAB:', results)
pool.terminate()
一个CCD_ 11也是如此。有6个进程和Pool(3)
要启动,所以它将同时只运行3个进程,当它有空闲进程时,map
将从列表中获取下一个值并运行进程。
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]
data = data_hsv + data_lab
pool = multiprocessing.Pool(3)
# example 1
results = pool.map(start_process, data)
print('Results:', results)
# example 2
for result in pool.map(start_process, data):
print('results:', result)
pool.terminate()
编辑:
它也适用于Ray
它只需要
from ray.util import multiprocessing
而不是
import multiprocessing
我没有用Dask、PySpark或Joblib 测试它
编辑:
Joblib 示例
from joblib import Parallel, delayed
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):
procedure = args[0]
data = args[1:]
segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)
return result
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]
data = data_hsv + data_lab
# --- version 1 ---
#pool = Parallel(n_jobs=3, backend='threading')
#pool = Parallel(n_jobs=3, backend='multiprocessing')
pool = Parallel(n_jobs=3)
# example 1
results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
# example 2
for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)
# --- version 2 ---
#with Parallel(n_jobs=3, backend='threading') as pool:
#with Parallel(n_jobs=3, backend='multiprocessing') as pool:
with Parallel(n_jobs=3) as pool:
# example 1
results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
# example 1
for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)