如何缓冲每个工作进程的日志并在进程完成时刷新它



这是我到目前为止所拥有的:

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager

def worker_process(q):
qh = logging.handlers.QueueHandler(q)
memoryhandler = logging.handlers.MemoryHandler(
10000,
logging.CRITICAL,
target=qh,
flushOnClose=True
)

root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(memoryhandler)
for i in range(4):
logger = logging.getLogger()
logger.log(logging.ERROR, 'Message no. %d', i)

memoryhandler.close()

if __name__ == '__main__':
q = Manager().Queue()
with ProcessPoolExecutor(max_workers=3) as executor:
for i in range(4):
executor.submit(worker_process, q)
logger = logging.getLogger()
while True:
record = q.get()
logger.handle(record)

根据 MemoryHandler 文档,我希望我的内存处理程序能够缓冲日志记录,并在调用memoryhandler.close()时将它们一次性刷新到队列中。

但是,这不是这里发生的事情:

Message no. 0
Message no. 0
Message no. 1
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 1
Message no. 2
Message no. 2
Message no. 3
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

为什么会这样呢?

有没有办法使用日志记录库获取以下输出?

Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

问题是您忘记了这是一个多处理应用程序,并且您的进程正在并行写入传递的队列,因此日志记录将按照您看到的任意顺序排列,除了最后 4 条记录(因为池中只有 3 个进程,并且第 4 个submit调用在前 3 个submit调用终止并因此将运行之前不会实际运行全部靠自己)。

还有一个额外的问题,即由于while True:循环,您的主进程永远不会终止。这可以通过让每个任务在实际日志记录之后向队列(例如None)写入附加的哨兵记录来解决。然后主进程可以循环,直到它看到其中的 4 条哨兵记录,当然,它会忽略这些记录。

解决方案是使用常规MemoryHandler,在调用close之前不会刷新记录(并且为了确保无论正在写入的记录的日志记录级别如何都不会发生这种情况,我们可以对此类进行子类化并重写shouldFlush方法),但随后确保在Lock的控制下刷新所有记录,以便只有一个进程在时间:

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Lock, cpu_count
import sys
class MyMemoryHandler(logging.handlers.MemoryHandler):
def shouldFlush(self, record):
"""
Check only for the buffer full.
"""
return (len(self.buffer) >= self.capacity)
def init_pool(the_lock):
global lock
lock = the_lock
def worker_process():
memoryhandler = MyMemoryHandler(
10000,
logging.CRITICAL,
target=logging.StreamHandler(sys.stderr),
flushOnClose=True
)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(memoryhandler)
for i in range(4):
logger = logging.getLogger()
# You can even write out CRITICAL level records now without flushing occurring:
logger.log(logging.CRITICAL, 'Message no. %d', i)
with lock:
memoryhandler.close()

if __name__ == '__main__':
lock = Lock()
# Note we are now using a pool size of 4
with ProcessPoolExecutor(max_workers=4, initializer=init_pool, initargs=(lock,)) as executor:
for i in range(4):
executor.submit(worker_process)

指纹:

Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

如果要像以前一样使用队列而不是无限循环,则可以使用按如下方式修改的代码。请注意,我已将托管队列替换为multiprocessing.Queue实例,该实例的性能要高得多。但是,为了确保没有死锁,主进程不能等到写入过程完成之后才从队列中读取所有消息,因为如果没有读取器,写入进程可能会阻止将记录写入队列。这就是为什么我将从with ProcessPoolExecutor(...) as executor:块内的队列中读取记录的代码移动的原因。我还通过初始化池的全局q变量中的每个进程(它不能作为参数传递,否则在处理处理池时一切都会挂起),从而使队列实例可用于工作器函数。

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Queue, Lock, cpu_count
import sys
class MyMemoryHandler(logging.handlers.MemoryHandler):
def shouldFlush(self, record):
"""
Check only for the buffer full.
"""
return (len(self.buffer) >= self.capacity)
def init_pool(the_lock, the_queue):
global lock, q
lock = the_lock
q = the_queue
SENTINEL = None
def worker_process():
qh = logging.handlers.QueueHandler(q)
memoryhandler = MyMemoryHandler(
10000,
logging.CRITICAL,
target=qh,
flushOnClose=True
)
root = logging.getLogger()
root.setLevel(logging.DEBUG)
root.addHandler(memoryhandler)
for i in range(4):
logger = logging.getLogger()
# You can even write out CRITICAL level records now without flushing occurring:
logger.log(logging.CRITICAL, 'Message no. %d', i)
with lock:
memoryhandler.close()
q.put(SENTINEL) # write sentinel

if __name__ == '__main__':
lock = Lock()
q = Queue()
N_TASKS = 4
with ProcessPoolExecutor(max_workers=min(cpu_count(), N_TASKS), initializer=init_pool, initargs=(lock, q)) as executor:
for i in range(N_TASKS):
executor.submit(worker_process)
logger = logging.getLogger()
seen_sentinels = 0
while seen_sentinels < N_TASKS:
record = q.get()
if record == SENTINEL:
seen_sentinels += 1
else:
logger.handle(record)

指纹:

Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3
Message no. 0
Message no. 1
Message no. 2
Message no. 3

LogHandler

下面的LogHandler类封装了分散在上述代码中的许多逻辑。我们不需要保留对我们创建的LogHandler实例的引用(实例化它就足够了),也不需要显式调用close

import logging, logging.handlers
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Lock, cpu_count, current_process
import sys
class LogHandler(logging.handlers.MemoryHandler):
def __init__(self,
lock,
*,
level=logging.DEBUG,
stream=sys.stdout,
capacity=10000,
format=False
):
self._lock = lock # a "suitable" lock
stream_handler = logging.StreamHandler(stream)
if format:
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
stream_handler.setFormatter(formatter)
super().__init__(capacity,
logging.CRITICAL,
target=stream_handler,
flushOnClose=True
)
root = logging.getLogger()
root.setLevel(level)
root.addHandler(self)
def shouldFlush(self, record):
"""
Check only for the buffer full.
"""
return (len(self.buffer) >= self.capacity)
def flush(self):
""" serialize """
with self._lock:
super().flush()

####################################################
def init_pool(the_lock):
global lock
lock = the_lock
def worker_process():
import time
# give each process in the pool a chance to run:
time.sleep(.1)
LogHandler(lock, format=True) # formatted output
logger = logging.getLogger(str(current_process().pid))
for i in range(4):
time.sleep(.5)
logger.log(logging.CRITICAL, 'message no. %d', i)

if __name__ == '__main__':
lock = Lock()
with ProcessPoolExecutor(max_workers=min(cpu_count(), 4), initializer=init_pool, initargs=(lock,)) as executor:
for i in range(4):
executor.submit(worker_process)

指纹:

2021-07-05 07:22:07,925 - 20148 - CRITICAL - message no. 0
2021-07-05 07:22:08,425 - 20148 - CRITICAL - message no. 1
2021-07-05 07:22:08,926 - 20148 - CRITICAL - message no. 2
2021-07-05 07:22:09,426 - 20148 - CRITICAL - message no. 3
2021-07-05 07:22:07,926 - 10864 - CRITICAL - message no. 0
2021-07-05 07:22:08,426 - 10864 - CRITICAL - message no. 1
2021-07-05 07:22:08,927 - 10864 - CRITICAL - message no. 2
2021-07-05 07:22:09,427 - 10864 - CRITICAL - message no. 3
2021-07-05 07:22:07,929 - 8528 - CRITICAL - message no. 0
2021-07-05 07:22:08,429 - 8528 - CRITICAL - message no. 1
2021-07-05 07:22:08,930 - 8528 - CRITICAL - message no. 2
2021-07-05 07:22:09,430 - 8528 - CRITICAL - message no. 3
2021-07-05 07:22:07,930 - 21200 - CRITICAL - message no. 0
2021-07-05 07:22:08,430 - 21200 - CRITICAL - message no. 1
2021-07-05 07:22:08,931 - 21200 - CRITICAL - message no. 2
2021-07-05 07:22:09,431 - 21200 - CRITICAL - message no. 3

相关内容

  • 没有找到相关文章

最新更新