在Python3中使用多处理BaseManager和Pool时的竞争条件


有人能帮我吗?我花了很多时间调试在multiprocessing库中使用BaseManagerPool时遇到的竞争条件。这是简化的代码
import sys, time
from multiprocessing.managers import BaseManager, SyncManager, BaseProxy
from multiprocessing import Process, cpu_count, Pool, Lock, get_context
from multiprocessing.queues import Queue, JoinableQueue
import queue
class QueueManager(BaseManager):
pass
class Singleton:
'''
Decorator class for singleton pattern.
'''
def __init__(self, cls):
self._cls = cls
self._lock = Lock()
self._instance = {}

def __call__(self, *args, **kwargs):
if self._cls not in self._instance:
with self._lock:
self._instance[self._cls] = self._cls(*args, **kwargs)
return self._instance[self._cls]

def getInstance(self):
return self._instance[self._cls]

class LoggingServer(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
self.msgQueue = queue.Queue()
try:
QueueManager.register('getQueue', callable=lambda: self.msgQueue)
self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd)
self.logServer = self.queueManager.get_server()
self.logServer.serve_forever()
except:
raise RuntimeError("Couldn't start the logging server!")
class LoggingProcess(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
try:
QueueManager.register('getQueue')
self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd)
self.queueManager.connect()
except:
raise RuntimeError("Couldn't connect logging process to the logging server!")
self.msgQueue  = self.queueManager.getQueue()
self.process = Process(target=self.loggingProcess, name = "Logging Process", args=(), daemon = True)
self.process.start()
def terminate(self):
self.msgQueue.join()
self.process.terminate()

def loggingProcess(self):
while True:
logObj = self.msgQueue.get()
print(logObj)

@Singleton
class Logger(object):
def __init__(self, address, pwd):
self.logServerAddr = address
self.logServerPwd = pwd
self.queueManager = None
self.msgQueue  = None
def connectToLogServer(self):
try:
QueueManager.register('getQueue')
self.queueManager = QueueManager(address = self.logServerAddr, authkey = self.logServerPwd)
self.queueManager.connect()
self.msgQueue  = self.queueManager.getQueue()
self.ready = True
except:
raise RuntimeError("Couldn't connect logger to Log Server!")
def ReadyCheck(func):
def makeDecorator(self, *args, **kwargs):
if not self.msgQueue:
self.connectToLogServer()
func(self, *args, **kwargs)
return makeDecorator
# Overrided function to log info
@ReadyCheck
def info(self, info, logfile = sys.stdout):
self.msgQueue.put(info)
address = ('', 50000)
password = b'PASSWORD'
log = Logger(address, password)
def callback(*args):
#print("Finished!!!")
pass
def job(index):
time.sleep(0.1)
log.info(str(log.msgQueue) + ":{}".format(index))
log.info("here {}".format(index))


if __name__ == "__main__":
# import multiprocessing
# logger = multiprocessing.log_to_stderr()
# logger.setLevel(multiprocessing.SUBDEBUG)
serverProcess = Process(target = LoggingServer, name = "LoggingServerDaemon", args = ((address, password)), daemon = True)
serverProcess.start()
time.sleep(1)
loggingProcess = LoggingProcess(address, password)
log.info("Starting...")
#pool = Pool(cpu_count())
pool = Pool() #Using a small number of worker(like 10), no problem, but if we increase to a bigger number, say 48 in my case, this program hangs every time...
results = [pool.apply_async(job, (i,), callback = callback) for i in range(1)]
pool.close()
pool.join()
log.info("Done")
#loggingProcess.terminate()
#serverProcess.terminate()

LoggerServer类充当日志服务器(类似于代理(,用于管理共享队列。LoggingProcess类是一个日志使用者类,它从共享队列(由LoggingServer管理(中获取日志。Logger类是一个生产者类,它将日志放入共享队列中。由于我想在多个模块中共享全局记录器,以便统一日志格式/输出位置/。。。(类似于日志记录标准库(,因此Logger类未完全初始化,稍后使用时将完全初始化(请参阅connectToLogServer(。我高度怀疑这是程序挂起的根本原因,但我不能更进一步。。。

挂起子进程(ForkPoolWorker(的回溯如下(使用py-spy(:

Process 3958088: python3 Logger.py
Python v3.9.0 (/usr/bin/python3.9)
Thread 3958088 (idle): "MainThread"
_recv (/usr/lib/python3.9/multiprocessing/connection.py:384)
_recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:419)
recv_bytes (/usr/lib/python3.9/multiprocessing/connection.py:221)
answer_challenge (/usr/lib/python3.9/multiprocessing/connection.py:757)
Client (/usr/lib/python3.9/multiprocessing/connection.py:513)
_decref (/usr/lib/python3.9/multiprocessing/managers.py:861)
__call__ (/usr/lib/python3.9/multiprocessing/util.py:224)
_run_finalizers (/usr/lib/python3.9/multiprocessing/util.py:300)
_exit_function (/usr/lib/python3.9/multiprocessing/util.py:334)
_bootstrap (/usr/lib/python3.9/multiprocessing/process.py:318)
_launch (/usr/lib/python3.9/multiprocessing/popen_fork.py:71)
__init__ (/usr/lib/python3.9/multiprocessing/popen_fork.py:19)
_Popen (/usr/lib/python3.9/multiprocessing/context.py:277)
start (/usr/lib/python3.9/multiprocessing/process.py:121)
_repopulate_pool_static (/usr/lib/python3.9/multiprocessing/pool.py:326)
_repopulate_pool (/usr/lib/python3.9/multiprocessing/pool.py:303)
__init__ (/usr/lib/python3.9/multiprocessing/pool.py:212)
Pool (/usr/lib/python3.9/multiprocessing/context.py:119)
<module> (/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py:129)

共享队列的refcount似乎未能递减。。。我在谷歌上搜索了很多东西,但似乎没有一个和这个一样。。。所以我把这个问题带到这里寻求帮助。如有任何意见和建议,我们将不胜感激!

更新:CTRL+C:后的回溯

Starting...
<queue.Queue object at 0x7fbe145d6190>:0
here 0
^CProcess ForkPoolWorker-49:
Process ForkPoolWorker-34:
Process ForkPoolWorker-29:
Process LoggingServerDaemon:
Traceback (most recent call last):
File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 134, in <module>
pool.join()
File "/usr/lib/python3.9/multiprocessing/pool.py", line 666, in join
p.join()
File "/usr/lib/python3.9/multiprocessing/process.py", line 149, in join
res = self._popen.wait(timeout)
File "/usr/lib/python3.9/multiprocessing/popen_fork.py", line 43, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
File "/usr/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
pid, sts = os.waitpid(self.pid, flag)
Traceback (most recent call last):
KeyboardInterrupt
Process Logging Process:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/process.py", line 318, in _bootstrap
util._exit_function()
File "/usr/lib/python3.9/multiprocessing/util.py", line 334, in _exit_function
_run_finalizers(0)
File "/usr/lib/python3.9/multiprocessing/util.py", line 300, in _run_finalizers
finalizer()
File "/usr/lib/python3.9/multiprocessing/util.py", line 224, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.9/multiprocessing/managers.py", line 861, in _decref
conn = _Client(token.address, authkey=authkey)
File "/usr/lib/python3.9/multiprocessing/process.py", line 318, in _bootstrap
util._exit_function()
File "/usr/lib/python3.9/multiprocessing/process.py", line 318, in _bootstrap
util._exit_function()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 513, in Client
answer_challenge(c, authkey)
File "/usr/lib/python3.9/multiprocessing/util.py", line 334, in _exit_function
_run_finalizers(0)
File "/usr/lib/python3.9/multiprocessing/util.py", line 334, in _exit_function
_run_finalizers(0)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 757, in answer_challenge
message = connection.recv_bytes(256)         # reject large message
File "/usr/lib/python3.9/multiprocessing/util.py", line 300, in _run_finalizers
finalizer()
File "/usr/lib/python3.9/multiprocessing/util.py", line 300, in _run_finalizers
finalizer()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.9/multiprocessing/util.py", line 224, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.9/multiprocessing/util.py", line 224, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.9/multiprocessing/managers.py", line 861, in _decref
conn = _Client(token.address, authkey=authkey)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
File "/usr/lib/python3.9/multiprocessing/managers.py", line 861, in _decref
conn = _Client(token.address, authkey=authkey)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 513, in Client
answer_challenge(c, authkey)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 513, in Client
answer_challenge(c, authkey)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 757, in answer_challenge
message = connection.recv_bytes(256)         # reject large message
File "/usr/lib/python3.9/multiprocessing/connection.py", line 757, in answer_challenge
message = connection.recv_bytes(256)         # reject large message
File "/usr/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
KeyboardInterrupt
File "/usr/lib/python3.9/multiprocessing/connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
KeyboardInterrupt
Traceback (most recent call last):
File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 43, in __init__
self.logServer.serve_forever()
File "/usr/lib/python3.9/multiprocessing/managers.py", line 183, in serve_forever
sys.exit(0)
SystemExit: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
Traceback (most recent call last):
File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 68, in loggingProcess
logObj = self.msgQueue.get()
File "<string>", line 2, in get
File "/usr/lib/python3.9/multiprocessing/managers.py", line 809, in _callmethod
kind, result = conn.recv()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 255, in recv
buf = self._recv_bytes()
File "/usr/lib/python3.9/multiprocessing/connection.py", line 419, in _recv_bytes
buf = self._recv(4)
File "/usr/lib/python3.9/multiprocessing/connection.py", line 384, in _recv
chunk = read(handle, remaining)
KeyboardInterrupt
File "/usr/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/slowfs/cn59sig01/usr/zhuoc/work/qualification-kit/reproducer/Logger.py", line 45, in __init__
raise RuntimeError("Couldn't start the logging server!")
RuntimeError: Couldn't start the logging server!

这是Python中的一个错误,请参阅https://bugs.python.org/issue44155

相关内容

  • 没有找到相关文章

最新更新