图像的OOP多处理



我想使用多处理并行分析多个图像,我的类是

class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self, filename, path):
self.filename = filename
self.path = path
self.input_data = None
self.output_data = None

def read_image(self):
self.input_data =  cv2.imread(self.path + self.filename)[1]
def write_image(self):
cv2.imwrite(self.path + self.filename.split('.')[0] + '_' + self.DISPLAY_NAME + '.png', self.output_data)
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self):
self.read_image()
self.process()
self.write_image()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
source = rgb_to_hsv(self.input_data)
self.output_data = treshold_otsu(source)

class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
source = rgb_to_lab(self.input_data)
self.output_data = global_threshold(source)

segmenter_class = {
'hsv': HSV_Segmentation,
'lab': LAB_Segmenter
}.get(procedure)
if not segmenter_class:
raise ArgumentError("Invalid segmentation method '{}'".format(procedure))
for img in images:
os.chdir(img_dir)
processor =  = segmenter_class(img, img_dir, procedure)
processor.start_pipeline()

然而,我不知道如何调用地图功能:

image_lst = os.listdir(my_image_path)
# We split the list into sublist with 5 elements because of 512 GB RAM limitation
if len(image_lst) > 4:
nr_of_sublists = int(len(image_lst)/2.5)
image_sub_lst  =np.array_split(image_lst, nr_of_sublists)
else:
image_sub_lst = [image_lst]
# We do the analysis for each sublist
for sub_lst in image_sub_lst:
print (sub_lst)
pool = multiprocessing.Pool(8)

# Call the processor 
processor =  = segmenter_class(img, img_dir, procedure)
processor.start_pipeline()
# How to call map???
pool.map(?, sub_lst)
pool.terminate()

编辑:

我试图将代码更改为注释,但仍然收到一个错误:

import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print ('init')
def read_image(self):
print ('read')
def write_image(self):
print ('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print ('ok starting')
filename, path = args
print(filename, path)
self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print ('ok HSV')
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print ('ok LAB')
procedure = 'hsv'
segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter
}.get(procedure)
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
if __name__ == '__main__':
pool = multiprocessing.Pool(3)
pool.map(segmenter_class.start_pipeline, [images, img_dir])
pool.terminate()

错误:上述异常是以下异常的直接原因:

追踪(最近一次通话(:文件";C:/Users/lueck/PycharmProjects/hyphe_cmd/hype_cmd/multy.py";,第50行,inpool.map(segmenter_class.start_pipeline,[images,img_dir](文件";C: \Users\lueck\AppData\Local\Continum\anaconda3\envs\hyphe_env\lib\multiprocessing\pool.py",第266行,在地图上回归自我_map_async(func,iterable,mapstar,chunksize(.get((文件";C: \Users\lueck\AppData\Local\Continum\anaconda3\envs\hyphe_env\lib\multiprocessing\pool.py",644行,在get中提升自我_价值类型错误:start_pipeline((缺少1个必需的位置参数:"args">

您必须创建具有对(filename, path)的列表

data = [(img, img_dir) for img in images]

然后map将在分离的过程中运行每对。

但你必须在start_pipeline中获得args

def start_pipeline(self, args):
print('ok starting')

filename, path = args
print('filename: {}npath: {}'.format(filename, path))

return self.process()

您必须使用()创建类segmenter_class的实例才能使用start_pipeline

pool.map(segmenter_class().start_pipeline, data)

BTW:在示例代码中,我还返回进程的结果。


import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')

filename, path = args
print('filename: {}npath: {}'.format(filename, path))

return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"

class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
if __name__ == '__main__':
procedure = 'hsv'

segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}.get(procedure)

images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'

data = [(img, img_dir) for img in images]

pool = multiprocessing.Pool(3)
# example 1
results = pool.map(segmenter_class().start_pipeline, data)
print('Results:', results)
# example 2
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()

编辑:

您还可以创建获得proceduredata的函数,然后在map中使用它——这样每个进程都会创建自己的procedure实例,或者您可以向不同的进程发送不同的过程。

import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')

filename, path = args
print('filename: {}npath: {}'.format(filename, path))

return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"

class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):
procedure = args[0]
data = args[1:]
segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)
return result

if __name__ == '__main__':
procedure = 'hsv'

images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'

data = [(procedure, img, img_dir) for img in images]

pool = multiprocessing.Pool(3)
# example 1
results = pool.map(start_process, data)
print('Results:', results)
# example 2
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()

不同程序的示例

if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'

pool = multiprocessing.Pool(3)
data = [('hsv', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results HSV:', results)
data = [('lab', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results LAB:', results)
pool.terminate()

一个CCD_ 11也是如此。有6个进程和Pool(3)要启动,所以它将同时只运行3个进程,当它有空闲进程时,map将从列表中获取下一个值并运行进程。

if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'

data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]

data = data_hsv + data_lab
pool = multiprocessing.Pool(3)
# example 1
results = pool.map(start_process, data)
print('Results:', results)
# example 2
for result in pool.map(start_process, data):
print('results:', result)
pool.terminate()

编辑:

它也适用于Ray

它只需要

from ray.util import multiprocessing

而不是

import multiprocessing

我没有用Dask、PySpark或Joblib 测试它


编辑:

Joblib 示例

from joblib import Parallel, delayed
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')

filename, path = args
print('filename: {}npath: {}'.format(filename, path))

return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):

procedure = args[0]
data = args[1:]

segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)

return result
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'

data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]

data = data_hsv + data_lab
# --- version 1 ---
#pool = Parallel(n_jobs=3, backend='threading')
#pool = Parallel(n_jobs=3, backend='multiprocessing')
pool = Parallel(n_jobs=3)

# example 1

results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
# example 2

for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)
# --- version 2 ---

#with Parallel(n_jobs=3, backend='threading') as pool:
#with Parallel(n_jobs=3, backend='multiprocessing') as pool:
with Parallel(n_jobs=3) as pool:
# example 1

results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
# example 1
for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)

相关内容

  • 没有找到相关文章