问题
在主过程中,我实例化了并行运行方法并应登录其自己的日志文件的多个类实例。完成工作之前和之后,主要过程中的一些事件应记录到另一个文件。
由于程序执行期间的任何时间都无法并行访问同一文件,因此我不会使用队列序列化记录事件。我只使用一个基本记录器,对于每个模块,一个从基本记录器继承的单独的记录器。
我现在的问题是,从UTILS模块中并行使用函数执行其方法的类实例。此utils模块中的记录器应记录到由其使用的类实例的文件中,只有在我知道的据我所知的情况下知道记录器的正确名称。
。示例代码
我将真实代码减少到一个最小的工作示例,以帮助更好地理解我的问题。在主模块中,我实例化了一个名为" main"的基本记录器,该基础记录器仅具有StreamHandler
,并且应用程序中的每个其他记录器都会继承
# Content of main.py
import logging
import multiprocessing
import time
from worker import Worker
from container import Container
logger = logging.getLogger('Main')
def setup_base_logger():
formatter = logging.Formatter('%(asctime)s - %(name)-14s - %(levelname)8s - %(message)s')
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if __name__ == '__main__':
multiprocessing.freeze_support()
setup_base_logger()
logger.warning('Starting the main program')
container = Container([Worker(name='Worker_Nr.%d' % i) for i in range(4)])
container.run()
Container
类在container.py中定义,只保留 Worker
实例的列表:
# Content of container.py
import logging
import multiprocessing
logger = logging.getLogger('Main.container')
def run_worker(worker):
worker.run()
class Container:
def __init__(self, workers):
self.workers = workers
def run(self):
logger.warning('The workers begin to run ...')
pool = multiprocessing.Pool(processes=4, maxtasksperchild=1)
pool.map(run_worker, self.workers)
logger.warning('Workers finished running.')
其任务是并行执行工人的run()
方法。我使用multiprocessing.Pool
,因为我需要限制使用的处理器数量。Worker
类在模块worker.py中定义:
# Content of worker.py
import logging
import os
import time
import util
def configure_logger(name, logfile):
logger = logging.getLogger(name)
formatter = logging.Formatter('%(asctime)s - %(name)-14s - %(levelname)-8s - %(message)s')
file_handler = logging.FileHandler(logfile, mode='w')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
class Worker:
def __init__(self, name):
self.name = name
self.run_time = 2
logger_name = 'Main.worker.' + name
configure_logger(name=logger_name, logfile=self.name + '.log')
self.logger = logging.getLogger(logger_name)
def __getstate__(self):
d = self.__dict__.copy()
if 'logger' in d:
d['logger'] = d['logger'].name
return d
def __setstate__(self, d):
if 'logger' in d:
d['logger'] = logging.getLogger(d['logger'])
self.__dict__.update(d)
def run(self):
self.logger.warning('{0} is running for {1} seconds with process id {2}'.format(self.name, self.run_time, os.getpid()))
time.sleep(self.run_time)
util.print_something(os.getpid())
self.logger.warning('{} woke up!'.format(self.name))
我认为Worker
需要一个logger实例作为属性,如果Worker
的每个实例都应该有一个日志文件。Utils模块看起来像这样:
# Content of util.py
import logging
logger = logging.getLogger('Main.util')
def print_something(s):
print(s)
logger.warning('%s was just printed', s)
执行main.py给出以下输出:
2017-05-03 11:08:05,738 - Main - WARNING - Starting the main program
2017-05-03 11:08:05,740 - Main.container - WARNING - The workers begin to run ...
Worker_Nr.0 is running for 2 seconds with process id 5532
Worker_Nr.1 is running for 2 seconds with process id 17908
Worker_Nr.2 is running for 2 seconds with process id 19796
Worker_Nr.3 is running for 2 seconds with process id 10804
5532
5532 was just printed
Worker_Nr.0 woke up!
17908
19796
17908 was just printed
19796 was just printed
Worker_Nr.1 woke up!
Worker_Nr.2 woke up!
10804
10804 was just printed
Worker_Nr.3 woke up!
2017-05-03 11:08:07,941 - Main.container - WARNING - Workers finished running.
您可以看到,Worker
实例创建的日志记录缺少格式。另外,创建的日志文件没有任何内容。如果在Worker.__init__
中使用configure_logger()
添加格式化处理程序,该怎么可能?
我尝试过的
- 将记录器名称传递给UTILS模块中的每个函数。这起作用,但似乎过于复杂,因为util.py中有很多功能,并且以这种方式使用了更多模块
- 有关在多处理应用程序中记录的类似问题通常想从不同的进程登录到同一文件,我想为每个过程一个单独的日志文件
问题
- 在UTILS模块(以及其他模块(中创建的日志记录如何转到正确的日志文件?
- 从
Worker
实例记录的所有内容都不会在没有格式的情况下排放到stdout,并且没有写入日志文件(但创建它们(。为什么?
我在Windows 7 64位上使用Python 3.5.1。
如果您认为在主过程中使用Queue
和记录线程更容易,那将是完全可以接受的。我唯一关心的是日志的顺序。我想我可以随后对它们进行分类,正如其他一些帖子中所建议的。
我的智慧,并在正确的方向上有任何帮助或提示。
您必须重复
configure_logger(name=logger_name, logfile=self.name + '.log')
对于每个过程
def run(self):
configure_logger(name=logger_name, logfile=self.name + '.log')
...
在此最小示例中,我能够重现原始错误,促使您修改Worker
类,以便可以腌制:
import logging
import multiprocessing
import time
def configure_logger(name, logfile):
logger = logging.getLogger(name)
formatter = logging.Formatter('%(asctime)s - %(name)-14s - %(levelname)-8s - %(message)s')
file_handler = logging.FileHandler(logfile, mode='w')
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.setLevel(logging.DEBUG)
class Worker:
def __init__(self, number):
self.name = "worker%d" % number
self.log_file = "%s.log" % self.name
configure_logger(self.name, self.log_file)
self.logger = logging.getLogger(self.name)
def run(self):
self.logger.info("%s is running...", self.name)
time.sleep(1.0)
self.logger.info("%s is exiting...", self.name)
def run_worker(worker):
worker.run()
N = 4
workers = [Worker(n) for n in range(N)]
pool = multiprocessing.Pool(processes=N, maxtasksperchild=1)
pool.map(run_worker, workers)
这是运行此程序的异常回忆:
Traceback (most recent call last):
File "custom.py", line 31, in <module>
pool.map(run_worker, workers)
File "/usr/local/Cellar/python/2.7.12_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 251, in map
return self.map_async(func, iterable, chunksize).get()
File "/usr/local/Cellar/python/2.7.12_2/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 567, in get
raise self._value
TypeError: can't pickle thread.lock objects
解决方案不是要更改Worker
类腌制的方式,而是在run
方法中调用logging.getLogger
:
class Worker:
def __init__(self, number):
self.name = "worker%d" % number
self.log_file = "%s.log" % self.name
configure_logger(self.name, self.log_file)
def run(self):
self.logger = logging.getLogger(self.name)
self.logger.info("%s is running...", self.name)
time.sleep(1.0)
self.logger.info("%s is exiting...", self.name)
随着此更改,程序运行,并且生成了预期的日志文件。