队列对象应仅通过继承在进程之间共享,即使 mp 也是如此.使用 Manager.Queue()



如下面发布的代码所示,我使用方法startProcessing(self)中所示的map_async.当代码的执行在标记为code-1部分中的代码中输入if语句时,应调用getResults方法 和块等待所有进程完成。我面临的问题是,尽管run(self,params)方法返回列表,但对proc.get()的调用会导致应用程序崩溃并生成下面发布的错误消息。

请让我知道为什么我收到此错误消息以及如何解决它

代码 1 - 对象的实例化

NDVIsPer10mX10mForNoneKeyWindowQueue = multiprocessing.Manager().Queue()  
areasOfCoveragePerNoneKeyWindowQueue = multiprocessing.Manager().Queue()   
noneKeyGridCellsProcessingPool = NoneKeyGridCellsProcessingPool(
NDVIsPer10mX10mForNoneKeyWindowQueue, 
areasOfCoveragePerNoneKeyWindowQueue,  
ndviTIFFDetails.getNDVIValuePer10mX10m(),
pixelsValuesDoNotSatisfyThresholdInTIFFImageDatasetCnt,
fourCornersOfWindowInEPSG25832,
[]
)
noneKeyGridCellsProcessingPool.startProcessing()
if (noneKeyWindowCnt > 0):
'''close pools: do not accept or allow any new tasks/jobs'''
resultsForNoneKeyGridCellsProcessingPool = NoneKeyGridCellsProcessingPool.getResults() #<====generates the error message posted below 
NoneKeyGridCellsProcessingPool.closePool()

类中的代码 NoneKeyGridCellsProcessingPool

@staticmethod
def getResults():
for proc in NoneKeyGridCellsProcessingPool.procs:
proc.get() #<====generates the error message posted below 
last = MiscUtils.getElementFromArrayForIndex(NoneKeyGridCellsProcessingPool.procs,len(NoneKeyGridCellsProcessingPool.procs) - 1)
return last.get()

def __init__(
self,

NDVIsPer10mX10mForNoneKeyWindowQueue:Queue, 
areasOfCoveragePerNoneKeyWindowQueue:Queue, 

NDVIValuePer10mX10m,
pixelsValuesDoNotSatisfyThresholdInTIFFImageDatasetCnt,
fourCornersOfWindowInEPSG25832,
fourCornersOfNoneKeyWindowInEPSG4326
):
super().__init__()

self.params = (
NDVIsPer10mX10mForNoneKeyWindowQueue, 
areasOfCoveragePerNoneKeyWindowQueue, 

NDVIValuePer10mX10m,
pixelsValuesDoNotSatisfyThresholdInTIFFImageDatasetCnt,
fourCornersOfWindowInEPSG25832,
fourCornersOfNoneKeyWindowInEPSG4326
)
self.res = None
def run(self,params):
self.NDVIsPer10mX10mForNoneKeyWindowQueue = params[0] 
self.areasOfCoveragePerNoneKeyWindowQueue = params[1]

NDVIValuePer10mX10m = params[2]
pixelsValuesDoNotSatisfyThresholdInTIFFImageDatasetCnt = params[3]
fourCornersOfWindowInEPSG25832 = params[4]
fourCornersOfNoneKeyWindowInEPSG4326 = params[5]
....
....
....
return self.NDVIsPer10mX10mForNoneKeyWindowQueue,self.areasOfCoveragePerNoneKeyWindowQueue

def startProcessing(self):
self.res = NoneKeyGridCellsProcessingPool.pool.map_async(self.run, [self.params])
NoneKeyGridCellsProcessingPool.procs.append(self.res)

错误消息

File "C:Python310libmultiprocessingcontext.py", line 359, in assert_spawning
raise RuntimeError(
RuntimeError: Queue objects should only be shared between processes through inheritance

注意:

i am returning a list as shown in the code in the class NoneKeyGridCellsProcessingPool, but .get() method as in `proc.get()` seems sees the returned values as a queue??!!

1st_update

为了解决这个问题,我在类中注释了 return 语句NoneKeyGridCellsProcessingPool出来,并将其替换为return 'ok',但是,我收到了相同的错误消息

不能向子进程传递或返回子进程或从子进程返回multiprocessing.Queue。队列应该已经在父进程中可用,在这种情况下,您只需从中读取即可。否则,必须传递实际的列表或元组,而不是多处理队列。

下面是生成类似堆栈跟踪的示例。鉴于您说过您不返回的队列类型不multiprocessing.Queue。假设其中一个参数包含multiprocessing.Queue(或是一个)是合理的。你需要弄清楚是哪个。也许尝试只是获取并返回其参数的虚拟作业,每个单独的参数一次。KBKself作为隐式参数传递给

何时执行pool.map_async(self.run, ...)
import multiprocessing
def main():
with multiprocessing.Pool() as pool:
queue = multiprocessing.Queue()
# The given queue is not allowed to be passed as an argument,
# but no error is raised by the following line.
future = pool.apply_async(dummy_job, [queue])
# Instead, the error is raised here. If the queue is changed to
# an instance of Manager.Queue then no error will be raised.
result = future.get()
print(result)
def dummy_job(arg):
return 'foo'
if __name__ == '__main__':
main()

思想

最新更新