多处理日志记录 - 如何将loguru与joblib Parallel一起使用



我有一堆Python脚本来运行一些数据科学模型。这需要相当长的时间,加快速度的唯一方法是使用多处理。为了实现这一点,我使用了joblib库,它运行得非常好。然而,不幸的是,这会弄乱日志记录,并且控制台输出也是乱码(然而,这是意料之中的(,因为所有进程同时转储各自的输出。

我是使用logging库的新手,并遵循了其他一些SO答案来尝试使其工作。我正在使用 8 个内核进行处理。使用 SO 上的答案,我写出了日志文件,并且每次迭代都期望有 8 个新文件。但是,它在第一次迭代时创建了 8 个文件,并且每次循环仅写入/附加到这 8 个文件。这有点不方便,所以我探索了更多,找到了logurulogzero。虽然它们都涵盖了使用multiprocessing的示例,但它们都没有展示如何将其与joblib一起使用。这是我到目前为止所拥有的:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger
import pandas as pd
import psutil
from joblib import Parallel, delayed
import helper
import log
import prep_data
import stock_subscriber_data
import train_model

def get_pred(cust_df, stock_id, logger):
logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))
cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)
stock_subscriber_data.write_data(abn_df, 't1')
test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)
return True

def main():
cust_df = stock_subscriber_data.get_data()
cust_df = helper.clean_data(cust_df)
stock_list = cust_df['intStockID'].unique()
max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
num_cores = min(multiprocessing.cpu_count(), max_proc)
logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)

if __name__ == "__main__":
main()

train_model.py

import math
from datetime import datetime
from itertools import product
from math import sqrt
import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error
import helper
import stock_subscriber_data
# bunch of functions here that don't need logging...
# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
#... do stuff here ...
#... and here ...
logger.info('{0:.3f}'.format(error))
return error, model

# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
#... do stuff here ...
#... and here ...
logger.info('> Model{0} {1:.3f}'.format(key, result))
return key, result, best_model

def read_train_write(data_df, stock_id, series, last_date, logger):
#... do stuff here ...
#... and here ...
logger.info('done')
#... do stuff here ...
#... and here ...
# bunch of logger.info() statements here... 
#
#
#
#
#... do stuff here ...
#... and here ...
return test_y, prd

当一次只有一个进程时,这很有效。但是,在多进程模式下运行时出现_pickle.PicklingError: Could not pickle the task to send it to the workers.错误。我做错了什么?如何修复此问题?我不介意切换到logurulogzero以外的其他东西,只要我可以创建一个具有连贯日志的文件,甚至是n个文件,每个文件都包含joblib的每次迭代的日志。

核心概念:主进程和每个子进程都需要调用logger.add()。您可以使用相同的文件名将所有日志通过管道传输到同一文件。

# Pseudo-code to get the idea
def main():
logfile = 'execution.log'
# Use enqueue to ensure works properly with multiprocessing
logger.add(logfile, enqueue=True)
...
# Add logfile to the params passed to get_pred
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logfile) for s in stock_list)
# Add logfile as param to get_pred
def get_pred(cust_df, stock_id, logfile):
# Add the *same* logfile each time the child process is called!
logger.add(logfile, enqueue=True)
# Add identifiers to log messages to distinguish them
logger.info(f'{stock_id} - more info')
# ...

在@CodinginCircles答案中,在get_pred()函数中,他们每次都使用唯一的日志文件名调用logger.add()。这将创建许多不同的日志文件。

相反,您可以每次调用具有相同名称的logger.add(),所有日志都将转到同一日志文件。设置enqueue=True将有助于确保它在多处理中正常工作。

要知道哪个日志对应于哪个事物(在我们的例子中是股票(,只需将股票名称添加到日志消息中,例如logger.info(f'{stock_id} - more info')

此外,我发现在使用 Loguru 时,backtrace=True, diagnose=Truelogger.add()添加参数特别有用。

最后说明:您还可以将LOGFILE = 'execution.log'定义为函数定义之外的常量,然后您无需将其作为参数传递给get_pred()。上面概述的方法允许您对日志文件名执行更多操作,例如为其提供唯一的时间签名。

我通过修改我的run_models.py让它工作。现在,我每个循环都有一个日志文件。这会创建大量日志文件,但它们都与每个循环相关,而不是混乱或任何东西。我猜是一步一步来。这是我所做的:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger
import pandas as pd
import psutil
from joblib import Parallel, delayed
import helper
import log
import prep_data
import stock_subscriber_data
import train_model

def get_pred(cust_df, stock_id):
log_file_name = "log_file_{}".format(stock_id)
logger.add(log_file_name, format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)
logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))
cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)
stock_subscriber_data.write_data(abn_df, 't1')
test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)
return True

def main():
cust_df = stock_subscriber_data.get_data()
cust_df = helper.clean_data(cust_df)
stock_list = cust_df['intStockID'].unique()
max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
num_cores = min(multiprocessing.cpu_count(), max_proc)
Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s) for s in stock_list)

if __name__ == "__main__":
main()

因此,将loguru与joblib一起使用的正确方法是将后端更改为多处理。

from loguru import logger
from joblib import Parallel, delayed
from tqdm.autonotebook import tqdm 
logger.remove()
logger.add(sys.stdout, level = 'INFO', enqueue=True)
logger.info('test')
logger.debug('should not appear')
def do_thing(i):
logger.info('item %i' %i)
logger.debug('should not appaear')
return None

Parallel(n_jobs=4, backend='multiprocessing')(
delayed(do_thing)(i)
for i in tqdm(range(10))
)

Parallel(n_jobs=4)(
delayed(do_thing)(i)
for i in tqdm(range(10))
)

第一个并行调用有效。第二个是你之前提到的老问题

最新更新