1 回答
TA贡献1804条经验 获得超2个赞
你必须用对创建列表(filename, path)
data = [(img, img_dir) for img in images]
然后 map 将在单独的进程中运行每一对。
但你必须args进去start_pipeline
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
你必须使用()创建类的实例segmenter_class来使用start_pipeline
pool.map(segmenter_class().start_pipeline, data)
顺便说一句:在示例代码中,我还返回了过程的结果。
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
if __name__ == '__main__':
procedure = 'hsv'
segmenter_class = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}.get(procedure)
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data = [(img, img_dir) for img in images]
pool = multiprocessing.Pool(3)
# example 1
results = pool.map(segmenter_class().start_pipeline, data)
print('Results:', results)
# example 2
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()
编辑:
您还可以创建获取然后使用它的函数procedure-data这样map每个进程都会创建自己的实例,procedure或者您可以将不同的过程发送到不同的进程。
import os
import multiprocessing
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):
procedure = args[0]
data = args[1:]
segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)
return result
if __name__ == '__main__':
procedure = 'hsv'
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data = [(procedure, img, img_dir) for img in images]
pool = multiprocessing.Pool(3)
# example 1
results = pool.map(start_process, data)
print('Results:', results)
# example 2
for result in pool.map(segmenter_class().start_pipeline, data):
print('result:', result)
pool.terminate()
不同程序的示例
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
pool = multiprocessing.Pool(3)
data = [('hsv', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results HSV:', results)
data = [('lab', img, img_dir) for img in images]
results = pool.map(start_process, data)
print('Results LAB:', results)
pool.terminate()
和一个一样map()。有 6 个进程要启动,Pool(3)因此它只会同时运行 3 个进程,当它有空闲进程时,map将从列表中获取下一个值并运行进程。
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]
data = data_hsv + data_lab
pool = multiprocessing.Pool(3)
# example 1
results = pool.map(start_process, data)
print('Results:', results)
# example 2
for result in pool.map(start_process, data):
print('results:', result)
pool.terminate()
编辑:
它也适用于Ray
它只需要
from ray.util import multiprocessing
代替
import multiprocessing
编辑:
Joblib示例
from joblib import Parallel, delayed
class SegmentationType(object):
DISPLAY_NAME = "invalid"
def __init__(self):
print('init')
def read_image(self):
print('read')
def write_image(self):
print('write')
def process(self):
# override in derived classes to perform an actual segmentation
pass
def start_pipeline(self, args):
print('ok starting')
filename, path = args
print('filename: {}\npath: {}'.format(filename, path))
return self.process()
class HSV_Segmenter(SegmentationType):
DISPLAY_NAME = 'HSV'
def process(self):
print('ok HSV')
return "result HSV"
class LabSegmenter(SegmentationType):
DISPLAY_NAME = 'LAB'
def process(self):
print('ok LAB')
return "result LAB"
segmenters = {
'hsv': HSV_Segmenter,
'lab': LabSegmenter,
}
def start_process(args):
procedure = args[0]
data = args[1:]
segmenter_class = segmenters.get(procedure)
result = segmenter_class().start_pipeline(data)
return result
if __name__ == '__main__':
images = ['01.png', '02.png', '03.png']
img_dir = 'C:/'
data_hsv = [('hsv', img, img_dir) for img in images]
data_lab = [('lab', img, img_dir) for img in images]
data = data_hsv + data_lab
# --- version 1 ---
#pool = Parallel(n_jobs=3, backend='threading')
#pool = Parallel(n_jobs=3, backend='multiprocessing')
pool = Parallel(n_jobs=3)
# example 1
results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
# example 2
for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)
# --- version 2 ---
#with Parallel(n_jobs=3, backend='threading') as pool:
#with Parallel(n_jobs=3, backend='multiprocessing') as pool:
with Parallel(n_jobs=3) as pool:
# example 1
results = pool( delayed(start_process)(args) for args in data )
print('Results:', results)
# example 1
for result in pool( delayed(start_process)(args) for args in data ):
print('result:', result)
添加回答
举报