记录多进程/多线程python脚本导致死锁



我正面临从以下脚本收集日志的问题。一旦我将SLEEP_TIME设置为太"小"的值,LoggingThread就会停止运行线程以某种方式阻塞了日志模块。日志请求时脚本冻结在action函数中。当SLEEP_TIME约为0.1时,脚本收集

我试着遵循这个答案,但它解决不了我的问题。

import multiprocessing
import threading
import logging
import time
SLEEP_TIME = 0.000001
logger = logging.getLogger()
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter('%(asctime)s %(levelname)s %(funcName)s(): %(message)s'))
ch.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
logger.addHandler(ch)

class LoggingThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
    def run(self):
        while True:
            logger.debug('LoggingThread: {}'.format(self))
            time.sleep(SLEEP_TIME)

def action(i):
    logger.debug('action: {}'.format(i))

def do_parallel_job():
    processes = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=processes)
    for i in range(20):
        pool.apply_async(action, args=(i,))
    pool.close()
    pool.join()

if __name__ == '__main__':
    logger.debug('START')
    #
    # multithread part
    #
    for _ in range(10):
        lt = LoggingThread()
        lt.setDaemon(True)
        lt.start()
    #
    # multiprocess part
    #
    do_parallel_job()
    logger.debug('FINISH')

如何在多进程和多线程脚本中使用日志模块?

这可能是错误6721。

这个问题在任何有锁、线程和分叉的情况下都很常见。如果线程1有锁,而线程2调用fork,在fork进程中,将只有线程2,并且锁将永远被持有。在您的情况下,这是logging.StreamHandler.lock

可以在这里(永久链接)找到logging模块的修复。注意,您还需要处理其他锁。

我最近在使用日志模块和Pathos多处理库时遇到了类似的问题。仍然不能100%确定,但似乎,在我的情况下,问题可能是由事实引起的,日志处理程序试图从不同的进程中重用锁对象。

可以用一个简单的包装器来修复它:

import threading
from collections import defaultdict
from multiprocessing import current_process
import colorlog

class ProcessSafeHandler(colorlog.StreamHandler):
    def __init__(self):
        super().__init__()
        self._locks = defaultdict(lambda: threading.RLock())
    def acquire(self):
        current_process_id = current_process().pid
        self._locks[current_process_id].acquire()
    def release(self):
        current_process_id = current_process().pid
        self._locks[current_process_id].release()

默认情况下,multiprocessing将在Linux上运行时对池中的进程进行fork()。结果子进程将丢失除主进程之外的所有正在运行的线程。所以如果你使用的是Linux,这就是问题所在。

第一个操作项:你永远不应该使用基于fork()的池;参见https://pythonspeed.com/articles/python-multiprocessing/和https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods。

在Windows上,我认为在macOS上的新版本的Python,使用基于"spawn"的池。这也是您应该在Linux上使用的。在此设置中,将启动一个新的Python进程。如您所料,新进程没有父进程的任何线程,因为它是一个新进程。

第二个操作项:您将希望在池中的每个子进程中完成日志设置;父进程的日志记录设置不足以从工作进程获取日志。您可以使用initializer关键字参数Pool来完成此操作,例如编写一个名为setup_logging()的函数,然后执行pool = multiprocessing.Pool(initializer=setup_logging) (https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool)。

最新更新