不同的Python日志处理程序不起作用



我在Python 3.7中有不同的日志处理程序。

第一是SqliteHandler,第二是FileHandler,第三是ConsoleHandler

  • 当我单独使用SqliteHandler时,它工作得很好
  • 当我同时使用FileHandlerConsoleHandler时,FileHandlerConsoleHandler都工作良好

但是当我尝试同时使用SqliteHandlerFileHandler时,Python在Filehandler:上失败了

类型错误:在字符串格式化期间,并非所有参数都已转换

我认为问题在于SqliteHandlerConsoleHandler的格式不同。我尝试规范化格式,但是";我的规范化格式";对他们没有帮助。(方法setHandler,在配置中找不到格式化程序项,硬编码修复(

import sqlite3
import logging
import datetime
import time
import sys
import json
import os
import tempfile

initial_sql = """CREATE TABLE IF NOT EXISTS %(tablename)s(
logcheck_loggingID INTEGER PRIMARY KEY AUTOINCREMENT,
Created datetime,
Source TEXT,
LogLevel INT,
LogLevelName TEXT,
Message TEXT,
Args TEXT,
Module TEXT,
FuncName TEXT,
LineNo INT,
Exception TEXT,
Process INT,
Thread TEXT,
ThreadName TEXT
)"""
insertion_sql = """INSERT INTO %(tablename)s(
Created,
Source,
LogLevel,
LogLevelName,
Message,
Args,
Module,
FuncName,
LineNo,
Exception,
Process,
Thread,
ThreadName
)
VALUES (
'%(created_sqlText)s',
'%(name)s',
%(levelno)d,
'%(levelname)s',
'%(msg)s',
'%(args)s',
'%(module)s',
'%(funcName)s',
%(lineno)d,
'%(exc_text)s',
%(process)d,
'%(thread)s',
'%(threadName)s'
);
"""
def addRecordMembers(record,tablename:str):
created_datetime = datetime.datetime.fromtimestamp(record.created)
t = created_datetime.strftime("%Y-%m-%d %H:%M:%S")
created_sqlText = "%s,%03d" % (t, record.msecs)
record.created_sqlText = created_sqlText
record.tablename = tablename    
#strftime("%Y-%m-%d %H-%M-%S,%f")
class SQLiteLoggingHandler(logging.Handler):
"""
Thread-safe logging handler for SQLite.
"""
configuration : dict
def __init__(self, configuration: dict):
super().__init__()
setHandler(self,configuration)
self.configuration = configuration
conn = sqlite3.connect(self.configuration["dbFilePath"])
sql = initial_sql % {"tablename": self.configuration["tableName"]}
conn.execute(sql)
conn.commit()
def emit(self, record):
self.format(record)
addRecordMembers(record,self.configuration["tableName"])

if record.exc_info:  # for exceptions
record.exc_text = logging._defaultFormatter.formatException(
record.exc_info)
#    record.exc_text = record.exc_text.replace("'","''")         ## added for fixing quotes causing error
else:
record.exc_text = ""

record.name = record.name.replace("'", "''")
record.levelname = record.levelname.replace("'", "''")
record.msg = record.msg .replace("'", "''")
record.args = str(record.args).replace("'", "''")
record.module = record.module.replace("'", "''")
record.funcName = record.funcName.replace("'", "''")
record.exc_text = record.exc_text.replace("'", "''")
record.threadName = record.threadName.replace("'", "''")
# Insert the log record
sql = insertion_sql % record.__dict__
conn = sqlite3.connect(self.configuration["dbFilePath"])
conn.execute(sql)
conn.commit()  # not efficient, but hopefully thread-safe
pass

def setHandler(handler:logging.Handler, config:dict):
logLevel = logging.getLevelName(config["loggingLevel"])
handler.setLevel(logLevel)
if "formatter" in config:
formatter = MyFormatter(config["formatter"])
handler.setFormatter(formatter)
else:
formatter =MyFormatter("%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s")
handler.setFormatter(formatter)

class MyFileHandler(logging.FileHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__(configuration["logFilePath"])
self.configuration = configuration
setHandler(self,configuration)
def emit(self, record):
addRecordMembers(record,"fileHandler")
super().emit(record)
class MyConsoleHandler(logging.StreamHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__()
self.configuration = configuration
setHandler(self,configuration)

def emit(self, record):
addRecordMembers(record,"consoleHandler")
super().emit(record)

class MyFormatter(logging.Formatter):
"Custom logging formatter to return usecs as part of time"
def __init__(self, *args, **kwargs):
logging.Formatter.__init__(self, *args, **kwargs)
# super().__init__( *args, **kwargs)
def formatTime(self, record, datefmt=None)->str:
sTxt = datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')
txt = "%s,%03d" % (sTxt, record.msecs) 
return txt
loaded = False
def load(logger: logging.Logger, config: dict) -> bool:
global loaded
if not loaded:
if "SQLiteLoggingHandler" in config:
sqlConfig = config["SQLiteLoggingHandler"]
print(
f'Add sqlite logging... dbFilePath: {sqlConfig["dbFilePath"]}')
print(
f'Add sqlite logging... tableName: {sqlConfig["tableName"]}')
print(
f'Add sqlite logging... loggingLevel: {sqlConfig["loggingLevel"]}')

sqlHandler = SQLiteLoggingHandler(sqlConfig)
logger.addHandler(sqlHandler)
loaded = True
if "FileHandler" in config:
fileConfig = config["FileHandler"]
print(
f'Add file logging... logFilePath: {fileConfig["logFilePath"]}')
print(
f'Add file logging... loggingLevel: {fileConfig["loggingLevel"]}')
print(
f'Add file logging... formatter: {fileConfig["formatter"]}')
fileHandler = MyFileHandler(fileConfig)
logger.addHandler(fileHandler)
loaded = True
if "ConsoleHandler" in config:
consoleConfig = config["ConsoleHandler"]
print(
f'Add Console logging... loggingLevel: {consoleConfig["loggingLevel"]}')
print(
f'Add Console logging... formatter: {consoleConfig["formatter"]}')
consoleHandler = MyConsoleHandler(consoleConfig)
logger.addHandler(consoleHandler)
loaded = True
if not loaded:
print(
"SQLiteLoggingHandler  or FileHandler or ConsoleHandler item not found in logging configuration.")
return loaded

def test_main():
dbFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.sqlite")
logFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.log")
if os.path.exists(dbFilePath):
os.remove(dbFilePath)
if os.path.exists(logFilePath):
os.remove(logFilePath)
config = {
"ConsoleHandler": {
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s;%(threadName)s"

},
"SQLiteLoggingHandler": {
"dbFilePath": dbFilePath,
"tableName": "monit_logging",
"loggingLevel": "DEBUG"
},
"DISABLED_FileHandler": {
"logFilePath": logFilePath,
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s"

},

}
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
load(logger, config)
# test
logging.info('Start')
logging.info('End')

if __name__ == '__main__':
test_main()

问题的原因是方法SQLiteLoggingHandler.emit。方法SQLiteLoggingHandler.emit更改record变量(例如属性record.args(,因此日志记录方法不起作用。

固定代码为:

myRecord = {}
...
myRecord["args"] = str(record.args).replace("'", "''")
...
sql = insertion_sql % myRecord

而不是

record.args = str(record.args).replace("'", "''")
...
sql = insertion_sql % record.__dict__

固定模块的完整代码为:

import sqlite3
import logging
import datetime
import time
import sys
import json
import os
import tempfile

initial_sql = """CREATE TABLE IF NOT EXISTS %(tablename)s(
logcheck_loggingID INTEGER PRIMARY KEY AUTOINCREMENT,
Created datetime,
Source TEXT,
LogLevel INT,
LogLevelName TEXT,
Message TEXT,
Args TEXT,
Module TEXT,
FuncName TEXT,
LineNo INT,
Exception TEXT,
Process INT,
Thread TEXT,
ThreadName TEXT
)"""
insertion_sql = """INSERT INTO %(tablename)s(
Created,
Source,
LogLevel,
LogLevelName,
Message,
Args,
Module,
FuncName,
LineNo,
Exception,
Process,
Thread,
ThreadName
)
VALUES (
'%(created_sqlText)s',
'%(name)s',
%(levelno)d,
'%(levelname)s',
'%(msg)s',
'%(args)s',
'%(module)s',
'%(funcName)s',
%(lineno)d,
'%(exc_text)s',
%(process)d,
'%(thread)s',
'%(threadName)s'
);
"""

def addRecordMembers(record, tablename: str):
created_datetime = datetime.datetime.fromtimestamp(record.created)
t = created_datetime.strftime("%Y-%m-%d %H:%M:%S")
created_sqlText = "%s,%03d" % (t, record.msecs)
record.created_sqlText = created_sqlText
record.tablename = tablename
# strftime("%Y-%m-%d %H-%M-%S,%f")

class SQLiteLoggingHandler(logging.Handler):
"""
Thread-safe logging handler for SQLite.
"""
configuration: dict
def __init__(self, configuration: dict):
super().__init__()
setHandler(self, configuration)
self.configuration = configuration
conn = sqlite3.connect(self.configuration["dbFilePath"])
sql = initial_sql % {"tablename": self.configuration["tableName"]}
conn.execute(sql)
conn.commit()
def format(self, record):
super().format(record)
def emit(self, record):
self.format(record)
addRecordMembers(record, self.configuration["tableName"])
myRecord = {}
if record.exc_info:  # for exceptions
exc_text = logging._defaultFormatter.formatException(
record.exc_info)
else:
exc_text = ""
myRecord["exc_text"] = exc_text.replace("'", "''")
myRecord["process"] = record.process
myRecord["thread"] = record.thread
myRecord["tablename"] = record.tablename 
myRecord["created_sqlText"] = record.created_sqlText 
myRecord["name"] = record.name.replace("'", "''")
myRecord["levelno"] = record.levelno
myRecord["lineno"] = record.lineno
myRecord["levelname"] = record.levelname.replace("'", "''")
myRecord["msg"] = record.msg .replace("'", "''")
myRecord["args"] = str(record.args).replace("'", "''")
myRecord["module"] = record.module.replace("'", "''")
myRecord["funcName"] = record.funcName.replace("'", "''")
myRecord["threadName"] = record.threadName.replace("'", "''")
# Insert the log record
sql = insertion_sql % myRecord
conn = sqlite3.connect(self.configuration["dbFilePath"])
conn.execute(sql)
conn.commit()  # not efficient, but hopefully thread-safe
pass

def setHandler(handler:logging.Handler, config:dict):
logLevel = logging.getLevelName(config["loggingLevel"])
handler.setLevel(logLevel)
if "formatter" in config:
formatter = MyFormatter(config["formatter"])
handler.setFormatter(formatter)
else:
formatter =MyFormatter("%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s")
handler.setFormatter(formatter)

class MyFileHandler(logging.FileHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__(configuration["logFilePath"])
self.configuration = configuration
setHandler(self,configuration)
def format(self, record):
return super().format(record)

def emit(self, record):
addRecordMembers(record,"fileHandler")
super().emit(record)
class MyConsoleHandler(logging.StreamHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__()
self.configuration = configuration
setHandler(self,configuration)
def format(self, record):
return super().format(record)
def emit(self, record):
addRecordMembers(record,"consoleHandler")
super().emit(record)

class MyFormatter(logging.Formatter):
"Custom logging formatter to return usecs as part of time"
def __init__(self, *args, **kwargs):
logging.Formatter.__init__(self, *args, **kwargs)
# super().__init__( *args, **kwargs)
def formatTime(self, record, datefmt=None)->str:
sTxt = datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')
txt = "%s,%03d" % (sTxt, record.msecs) 
return txt
loaded = False
def load(logger: logging.Logger, config: dict) -> bool:
global loaded
if not loaded:
if "SQLiteLoggingHandler" in config:
sqlConfig = config["SQLiteLoggingHandler"]
print(
f'Add sqlite logging... dbFilePath: {sqlConfig["dbFilePath"]}')
print(
f'Add sqlite logging... tableName: {sqlConfig["tableName"]}')
print(
f'Add sqlite logging... loggingLevel: {sqlConfig["loggingLevel"]}')

sqlHandler = SQLiteLoggingHandler(sqlConfig)
logger.addHandler(sqlHandler)
loaded = True
if "FileHandler" in config:
fileConfig = config["FileHandler"]
print(
f'Add file logging... logFilePath: {fileConfig["logFilePath"]}')
print(
f'Add file logging... loggingLevel: {fileConfig["loggingLevel"]}')
print(
f'Add file logging... formatter: {fileConfig["formatter"]}')
fileHandler = MyFileHandler(fileConfig)
logger.addHandler(fileHandler)
loaded = True
if "ConsoleHandler" in config:
consoleConfig = config["ConsoleHandler"]
print(
f'Add Console logging... loggingLevel: {consoleConfig["loggingLevel"]}')
print(
f'Add Console logging... formatter: {consoleConfig["formatter"]}')
consoleHandler = MyConsoleHandler(consoleConfig)
logger.addHandler(consoleHandler)
loaded = True

if not loaded:
print(
"SQLiteLoggingHandler  or FileHandler or ConsoleHandler item not found in logging configuration.")
return loaded

def test_main():
dbFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.sqlite")
logFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.log")
if os.path.exists(dbFilePath):
os.remove(dbFilePath)
if os.path.exists(logFilePath):
os.remove(logFilePath)
config = {
"ConsoleHandler": {
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s;%(threadName)s"

},
"SQLiteLoggingHandler": {
"dbFilePath": dbFilePath,
"tableName": "monit_logging",
"loggingLevel": "DEBUG"
},
"FileHandler": {
"logFilePath": logFilePath,
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s"

},

}

print (dbFilePath)
print (logFilePath)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
load(logger, config)
# test
logging.info('Start')
logging.info('End')
return     
conn = sqlite3.connect(dbFilePath)
cursor = conn.execute("SELECT * FROM monit_logging")
columnnames = [description[0] for description in cursor.description]
print(columnnames)
rows = cursor.fetchall()
prints = []
for r in rows:
print(f"{columnnames[0]}: {r[0]}, {r[columnnames.index('Message')]}")
prints.append(
f"{columnnames[0]}: {r[0]}, {r[columnnames.index('Message')]}")
expected = """logcheck_loggingID: 1, Start
logcheck_loggingID: 2, End"""
print ("n".join(prints))
# return assert expected == "n".join(prints)

if __name__ == '__main__':
test_main()


最新更新