我在Python 3.7中有不同的日志处理程序。
第一是SqliteHandler
,第二是FileHandler
,第三是ConsoleHandler
。
- 当我单独使用
SqliteHandler
时,它工作得很好 - 当我同时使用
FileHandler
和ConsoleHandler
时,FileHandler
和ConsoleHandler
都工作良好
但是当我尝试同时使用SqliteHandler
和FileHandler
时,Python在Filehandler
:上失败了
类型错误:在字符串格式化期间,并非所有参数都已转换
我认为问题在于SqliteHandler
和ConsoleHandler
的格式不同。我尝试规范化格式,但是";我的规范化格式";对他们没有帮助。(方法setHandler,在配置中找不到格式化程序项,硬编码修复(
import sqlite3
import logging
import datetime
import time
import sys
import json
import os
import tempfile
initial_sql = """CREATE TABLE IF NOT EXISTS %(tablename)s(
logcheck_loggingID INTEGER PRIMARY KEY AUTOINCREMENT,
Created datetime,
Source TEXT,
LogLevel INT,
LogLevelName TEXT,
Message TEXT,
Args TEXT,
Module TEXT,
FuncName TEXT,
LineNo INT,
Exception TEXT,
Process INT,
Thread TEXT,
ThreadName TEXT
)"""
insertion_sql = """INSERT INTO %(tablename)s(
Created,
Source,
LogLevel,
LogLevelName,
Message,
Args,
Module,
FuncName,
LineNo,
Exception,
Process,
Thread,
ThreadName
)
VALUES (
'%(created_sqlText)s',
'%(name)s',
%(levelno)d,
'%(levelname)s',
'%(msg)s',
'%(args)s',
'%(module)s',
'%(funcName)s',
%(lineno)d,
'%(exc_text)s',
%(process)d,
'%(thread)s',
'%(threadName)s'
);
"""
def addRecordMembers(record,tablename:str):
created_datetime = datetime.datetime.fromtimestamp(record.created)
t = created_datetime.strftime("%Y-%m-%d %H:%M:%S")
created_sqlText = "%s,%03d" % (t, record.msecs)
record.created_sqlText = created_sqlText
record.tablename = tablename
#strftime("%Y-%m-%d %H-%M-%S,%f")
class SQLiteLoggingHandler(logging.Handler):
"""
Thread-safe logging handler for SQLite.
"""
configuration : dict
def __init__(self, configuration: dict):
super().__init__()
setHandler(self,configuration)
self.configuration = configuration
conn = sqlite3.connect(self.configuration["dbFilePath"])
sql = initial_sql % {"tablename": self.configuration["tableName"]}
conn.execute(sql)
conn.commit()
def emit(self, record):
self.format(record)
addRecordMembers(record,self.configuration["tableName"])
if record.exc_info: # for exceptions
record.exc_text = logging._defaultFormatter.formatException(
record.exc_info)
# record.exc_text = record.exc_text.replace("'","''") ## added for fixing quotes causing error
else:
record.exc_text = ""
record.name = record.name.replace("'", "''")
record.levelname = record.levelname.replace("'", "''")
record.msg = record.msg .replace("'", "''")
record.args = str(record.args).replace("'", "''")
record.module = record.module.replace("'", "''")
record.funcName = record.funcName.replace("'", "''")
record.exc_text = record.exc_text.replace("'", "''")
record.threadName = record.threadName.replace("'", "''")
# Insert the log record
sql = insertion_sql % record.__dict__
conn = sqlite3.connect(self.configuration["dbFilePath"])
conn.execute(sql)
conn.commit() # not efficient, but hopefully thread-safe
pass
def setHandler(handler:logging.Handler, config:dict):
logLevel = logging.getLevelName(config["loggingLevel"])
handler.setLevel(logLevel)
if "formatter" in config:
formatter = MyFormatter(config["formatter"])
handler.setFormatter(formatter)
else:
formatter =MyFormatter("%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s")
handler.setFormatter(formatter)
class MyFileHandler(logging.FileHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__(configuration["logFilePath"])
self.configuration = configuration
setHandler(self,configuration)
def emit(self, record):
addRecordMembers(record,"fileHandler")
super().emit(record)
class MyConsoleHandler(logging.StreamHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__()
self.configuration = configuration
setHandler(self,configuration)
def emit(self, record):
addRecordMembers(record,"consoleHandler")
super().emit(record)
class MyFormatter(logging.Formatter):
"Custom logging formatter to return usecs as part of time"
def __init__(self, *args, **kwargs):
logging.Formatter.__init__(self, *args, **kwargs)
# super().__init__( *args, **kwargs)
def formatTime(self, record, datefmt=None)->str:
sTxt = datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')
txt = "%s,%03d" % (sTxt, record.msecs)
return txt
loaded = False
def load(logger: logging.Logger, config: dict) -> bool:
global loaded
if not loaded:
if "SQLiteLoggingHandler" in config:
sqlConfig = config["SQLiteLoggingHandler"]
print(
f'Add sqlite logging... dbFilePath: {sqlConfig["dbFilePath"]}')
print(
f'Add sqlite logging... tableName: {sqlConfig["tableName"]}')
print(
f'Add sqlite logging... loggingLevel: {sqlConfig["loggingLevel"]}')
sqlHandler = SQLiteLoggingHandler(sqlConfig)
logger.addHandler(sqlHandler)
loaded = True
if "FileHandler" in config:
fileConfig = config["FileHandler"]
print(
f'Add file logging... logFilePath: {fileConfig["logFilePath"]}')
print(
f'Add file logging... loggingLevel: {fileConfig["loggingLevel"]}')
print(
f'Add file logging... formatter: {fileConfig["formatter"]}')
fileHandler = MyFileHandler(fileConfig)
logger.addHandler(fileHandler)
loaded = True
if "ConsoleHandler" in config:
consoleConfig = config["ConsoleHandler"]
print(
f'Add Console logging... loggingLevel: {consoleConfig["loggingLevel"]}')
print(
f'Add Console logging... formatter: {consoleConfig["formatter"]}')
consoleHandler = MyConsoleHandler(consoleConfig)
logger.addHandler(consoleHandler)
loaded = True
if not loaded:
print(
"SQLiteLoggingHandler or FileHandler or ConsoleHandler item not found in logging configuration.")
return loaded
def test_main():
dbFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.sqlite")
logFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.log")
if os.path.exists(dbFilePath):
os.remove(dbFilePath)
if os.path.exists(logFilePath):
os.remove(logFilePath)
config = {
"ConsoleHandler": {
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s;%(threadName)s"
},
"SQLiteLoggingHandler": {
"dbFilePath": dbFilePath,
"tableName": "monit_logging",
"loggingLevel": "DEBUG"
},
"DISABLED_FileHandler": {
"logFilePath": logFilePath,
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s"
},
}
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
load(logger, config)
# test
logging.info('Start')
logging.info('End')
if __name__ == '__main__':
test_main()
问题的原因是方法SQLiteLoggingHandler.emit
。方法SQLiteLoggingHandler.emit
更改record
变量(例如属性record.args
(,因此日志记录方法不起作用。
固定代码为:
myRecord = {}
...
myRecord["args"] = str(record.args).replace("'", "''")
...
sql = insertion_sql % myRecord
而不是
record.args = str(record.args).replace("'", "''")
...
sql = insertion_sql % record.__dict__
固定模块的完整代码为:
import sqlite3
import logging
import datetime
import time
import sys
import json
import os
import tempfile
initial_sql = """CREATE TABLE IF NOT EXISTS %(tablename)s(
logcheck_loggingID INTEGER PRIMARY KEY AUTOINCREMENT,
Created datetime,
Source TEXT,
LogLevel INT,
LogLevelName TEXT,
Message TEXT,
Args TEXT,
Module TEXT,
FuncName TEXT,
LineNo INT,
Exception TEXT,
Process INT,
Thread TEXT,
ThreadName TEXT
)"""
insertion_sql = """INSERT INTO %(tablename)s(
Created,
Source,
LogLevel,
LogLevelName,
Message,
Args,
Module,
FuncName,
LineNo,
Exception,
Process,
Thread,
ThreadName
)
VALUES (
'%(created_sqlText)s',
'%(name)s',
%(levelno)d,
'%(levelname)s',
'%(msg)s',
'%(args)s',
'%(module)s',
'%(funcName)s',
%(lineno)d,
'%(exc_text)s',
%(process)d,
'%(thread)s',
'%(threadName)s'
);
"""
def addRecordMembers(record, tablename: str):
created_datetime = datetime.datetime.fromtimestamp(record.created)
t = created_datetime.strftime("%Y-%m-%d %H:%M:%S")
created_sqlText = "%s,%03d" % (t, record.msecs)
record.created_sqlText = created_sqlText
record.tablename = tablename
# strftime("%Y-%m-%d %H-%M-%S,%f")
class SQLiteLoggingHandler(logging.Handler):
"""
Thread-safe logging handler for SQLite.
"""
configuration: dict
def __init__(self, configuration: dict):
super().__init__()
setHandler(self, configuration)
self.configuration = configuration
conn = sqlite3.connect(self.configuration["dbFilePath"])
sql = initial_sql % {"tablename": self.configuration["tableName"]}
conn.execute(sql)
conn.commit()
def format(self, record):
super().format(record)
def emit(self, record):
self.format(record)
addRecordMembers(record, self.configuration["tableName"])
myRecord = {}
if record.exc_info: # for exceptions
exc_text = logging._defaultFormatter.formatException(
record.exc_info)
else:
exc_text = ""
myRecord["exc_text"] = exc_text.replace("'", "''")
myRecord["process"] = record.process
myRecord["thread"] = record.thread
myRecord["tablename"] = record.tablename
myRecord["created_sqlText"] = record.created_sqlText
myRecord["name"] = record.name.replace("'", "''")
myRecord["levelno"] = record.levelno
myRecord["lineno"] = record.lineno
myRecord["levelname"] = record.levelname.replace("'", "''")
myRecord["msg"] = record.msg .replace("'", "''")
myRecord["args"] = str(record.args).replace("'", "''")
myRecord["module"] = record.module.replace("'", "''")
myRecord["funcName"] = record.funcName.replace("'", "''")
myRecord["threadName"] = record.threadName.replace("'", "''")
# Insert the log record
sql = insertion_sql % myRecord
conn = sqlite3.connect(self.configuration["dbFilePath"])
conn.execute(sql)
conn.commit() # not efficient, but hopefully thread-safe
pass
def setHandler(handler:logging.Handler, config:dict):
logLevel = logging.getLevelName(config["loggingLevel"])
handler.setLevel(logLevel)
if "formatter" in config:
formatter = MyFormatter(config["formatter"])
handler.setFormatter(formatter)
else:
formatter =MyFormatter("%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s")
handler.setFormatter(formatter)
class MyFileHandler(logging.FileHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__(configuration["logFilePath"])
self.configuration = configuration
setHandler(self,configuration)
def format(self, record):
return super().format(record)
def emit(self, record):
addRecordMembers(record,"fileHandler")
super().emit(record)
class MyConsoleHandler(logging.StreamHandler):
configuration:dict
def __init__(self, configuration: dict):
super().__init__()
self.configuration = configuration
setHandler(self,configuration)
def format(self, record):
return super().format(record)
def emit(self, record):
addRecordMembers(record,"consoleHandler")
super().emit(record)
class MyFormatter(logging.Formatter):
"Custom logging formatter to return usecs as part of time"
def __init__(self, *args, **kwargs):
logging.Formatter.__init__(self, *args, **kwargs)
# super().__init__( *args, **kwargs)
def formatTime(self, record, datefmt=None)->str:
sTxt = datetime.datetime.fromtimestamp(record.created).strftime('%Y-%m-%d %H:%M:%S')
txt = "%s,%03d" % (sTxt, record.msecs)
return txt
loaded = False
def load(logger: logging.Logger, config: dict) -> bool:
global loaded
if not loaded:
if "SQLiteLoggingHandler" in config:
sqlConfig = config["SQLiteLoggingHandler"]
print(
f'Add sqlite logging... dbFilePath: {sqlConfig["dbFilePath"]}')
print(
f'Add sqlite logging... tableName: {sqlConfig["tableName"]}')
print(
f'Add sqlite logging... loggingLevel: {sqlConfig["loggingLevel"]}')
sqlHandler = SQLiteLoggingHandler(sqlConfig)
logger.addHandler(sqlHandler)
loaded = True
if "FileHandler" in config:
fileConfig = config["FileHandler"]
print(
f'Add file logging... logFilePath: {fileConfig["logFilePath"]}')
print(
f'Add file logging... loggingLevel: {fileConfig["loggingLevel"]}')
print(
f'Add file logging... formatter: {fileConfig["formatter"]}')
fileHandler = MyFileHandler(fileConfig)
logger.addHandler(fileHandler)
loaded = True
if "ConsoleHandler" in config:
consoleConfig = config["ConsoleHandler"]
print(
f'Add Console logging... loggingLevel: {consoleConfig["loggingLevel"]}')
print(
f'Add Console logging... formatter: {consoleConfig["formatter"]}')
consoleHandler = MyConsoleHandler(consoleConfig)
logger.addHandler(consoleHandler)
loaded = True
if not loaded:
print(
"SQLiteLoggingHandler or FileHandler or ConsoleHandler item not found in logging configuration.")
return loaded
def test_main():
dbFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.sqlite")
logFilePath = os.path.join(tempfile.gettempdir(), "loggingHandlers.log")
if os.path.exists(dbFilePath):
os.remove(dbFilePath)
if os.path.exists(logFilePath):
os.remove(logFilePath)
config = {
"ConsoleHandler": {
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s;%(threadName)s"
},
"SQLiteLoggingHandler": {
"dbFilePath": dbFilePath,
"tableName": "monit_logging",
"loggingLevel": "DEBUG"
},
"FileHandler": {
"logFilePath": logFilePath,
"loggingLevel": "DEBUG",
"formatter": "%(asctime)s;%(levelname)s;%(name)s;%(module)s;%(funcName)s;%(lineno)d;%(message)s"
},
}
print (dbFilePath)
print (logFilePath)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
load(logger, config)
# test
logging.info('Start')
logging.info('End')
return
conn = sqlite3.connect(dbFilePath)
cursor = conn.execute("SELECT * FROM monit_logging")
columnnames = [description[0] for description in cursor.description]
print(columnnames)
rows = cursor.fetchall()
prints = []
for r in rows:
print(f"{columnnames[0]}: {r[0]}, {r[columnnames.index('Message')]}")
prints.append(
f"{columnnames[0]}: {r[0]}, {r[columnnames.index('Message')]}")
expected = """logcheck_loggingID: 1, Start
logcheck_loggingID: 2, End"""
print ("n".join(prints))
# return assert expected == "n".join(prints)
if __name__ == '__main__':
test_main()