如何在 Pathos 中使用多处理正确进行日志记录



我有一个并行程序。它按顺序读取文件,每个文件中的任务跨进程拆分。在所有进程上完成文件后,将加载下一个文件,依此类推。我想写入日志文件,以便每个数据文件都有一个新的日志文件。我希望我的所有进程都写入日志信息,并且它们不会相互干扰。在阅读了一些帖子和日志记录文档后,我想出了以下最小示例

import numpy as np
import matplotlib.pyplot as plt
from time import time
import multiprocessing, pathos
import logging
def task(x):
thisID = pathos.core.getpid()
logger.info("Process " + str(thisID) + ": Processing stuff " + x)
return 1
for iJob in range(3):
# Create file handler
fh = logging.FileHandler('log'+str(iJob)+'_pathos.txt')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
fh.setFormatter(formatter)
logger = pathos.logger(level=logging.DEBUG, handler=fh)
pool = pathos.multiprocessing.ProcessingPool(7)
results_mp = pool.map(task, list("aalkfnalkgnlkaerngnarngkwlekfwebkwr"))
logger.removeHandler(fh)
print(results_mp)

无论我尝试什么,所有输出都转到第一个日志文件,其他两个都已创建但保持为空。使用裸multiprocessing的替代实现似乎工作正常(见下文(。问题是我需要 pathos,因为它允许我并行化一些导入的库,而常规多处理拒绝使用

def task(x):
thisID = multiprocessing.current_process()._identity[0]
logger.info("Process " + str(thisID) + ": Processing stuff " + x)
return 1
for iJob in range(3):
# Create file handler
fh = logging.FileHandler('log'+str(iJob)+'_pathos.txt')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
fh.setFormatter(formatter)
logger = logging.getLogger("MyLogger")
logger.setLevel(logging.DEBUG)
logger.addHandler(fh)
pool = multiprocessing.Pool(7)
results_mp = pool.map(task, list("aalkfnalkgnlkaerngnarngkwlekfwebkwr"))
logger.removeHandler(fh)
print(results_mp)

也许提到我从Jupyter笔记本运行代码很有用。此外,当我运行同一单元两次时,我的行为有些不稳定,删除了两者之间的日志文件。有时新的日志文件都是空的

ProcessPool 创建具有自己内存的新工作线程。所以你不能/不应该访问全局变量。将您需要的所有内容传递到pool.map()

这对我有用:

import numpy as np
import matplotlib.pyplot as plt
from time import time
import multiprocessing, pathos
import logging
def task(x, iJob):
thisID = pathos.core.getpid()
fh = logging.FileHandler('log'+str(iJob)+'_pathos.txt')
fh.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
fh.setFormatter(formatter)
logger = pathos.logger(level=logging.DEBUG, handler=fh)
logger.info("Process " + str(thisID) + ": Processing stuff " + x)
logger.removeHandler(fh)
return 1
for iJob in range(3):
# Create file handler
pool = pathos.multiprocessing.ProcessPool(7)
input = "aalkfnalkgnlkaerngnarngkwlekfwebkwr"
results_mp = pool.map(task, list(input), [iJob] * len(input))
print(results_mp)

相关内容

  • 没有找到相关文章

最新更新