如何在Celery任务执行期间强制使用记录器格式



我有一些服务,它使用Python日志记录模块来记录调试日志。

my_service.py:

import logging
logger = logging.getLogger(__name__)
class SomeService:
def synchronize(self):
logger.debug('synchronizing stuff')
external_library.call('do it')
logger.debug('found x results')

然后,我使用来自芹菜任务的此服务

tasks.py:

@shared_task
def synchronize_stuff():
stuff = some_service.synchronize()

然后Worker输出这样的日志:

worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

这对于调试来说已经足够好了,但我希望在这些日志中包括任务名称和uuid。这可以通过使用类似这样的芹菜任务记录器来实现:

my_service.py:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
class SomeService:
def synchronize(self):
logger.debug('synchronizing stuff')
external_library.call('do it')
logger.debug('found x results')

这正是我想要的日志记录:

worker_1     | [2019-01-22 11:39:19,232: DEBUG/MainProcess] Task accepted: my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] pid:12
worker_1     | [2019-01-22 11:39:19,237: DEBUG/ForkPoolWorker-1] Starting new HTTPS connection (1): example.com:443
worker_1     | [2019-01-22 11:39:19,839: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/stuff HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:19,860: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 35
worker_1     | [2019-01-22 11:39:19,862: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 35 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,863: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 36
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Item 36 already closed, ignoring.
worker_1     | [2019-01-22 11:39:19,865: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 49
worker_1     | [2019-01-22 11:39:20,380: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/49 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,429: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 50
worker_1     | [2019-01-22 11:39:20,680: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/50 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:20,693: DEBUG/ForkPoolWorker-1] my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8]: Processing 51
worker_1     | [2019-01-22 11:39:21,138: DEBUG/ForkPoolWorker-1] https://example.com:443 "GET /api/detail/51 HTTP/1.1" 200 None
worker_1     | [2019-01-22 11:39:21,197: INFO/ForkPoolWorker-1] Task my_task_name[48d706d7-0d92-43aa-aa9d-d5db8d660af8] succeeded in 1.9656380449960125s: None

但我有两个问题:

  1. 我不想在服务中使用芹菜记录器。即使在根本没有安装Celery的环境中也可以使用该服务(那么任务名称和uuid不包括在日志中也没关系)

  2. 在同一任务期间执行的外部库中的日志不使用同一记录器,因此日志中不包括任务名称和uuid。

这让我想到了这个问题:是否可以在任务级别(在tasks.py中)指定(强制)记录器,无论我如何登录我的服务或外部库如何登录,都将使用该记录器?这样的东西就可以了:

tasks.py:

@shared_task
def synchronize_stuff():
logging.enforce_logger(get_task_logger(__name__))
stuff = some_service.synchronize()
logging.restore_logger()

同样值得注意的是,我在项目中使用了Django。

谢谢!

这并不是您想要的。但我也遇到了类似的问题,并用一个日志过滤器解决了这个问题,我在处理程序上应用了这个过滤器,该处理程序将日志记录到我不想要芹菜日志消息的服务。我描述了我的问题和解决方案:如果我使用芹菜作为任务调度程序,我如何从我的python应用程序登录到splink?

告诉我这是否指向正确的方向。。。

此外,我使用python logging.dictConfig获得了非常好的结果!

Martin Janeček提出的解决方案是我发现的唯一适用于我的

from logging import Filter
from celery.signals import setup_logging
class CeleryTaskFilter(Filter):
def filter(self, record):
return record.processName.find("Worker") != -1
celery_log_config = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"celeryTask": {
"()": "celery.app.log.TaskFormatter",
"fmt": "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]:%(module)s:%(funcName)s: %(message)s",
},
},
"filters": {
"celeryTask": {
"()": CeleryTaskFilter,
},
},
"handlers": {
"console": {
"level": "INFO",
"class": stream_handler,
"formatter": "celeryTask",
"filters": ["celeryTask"],
},
},
"loggers": {
"": {
"handlers": ["console"],
"level": "DEBUG",
"propagate": False,
}
},
}

然后我只需要确保这是在Celery初始化时设置的

from logging.config import dictConfig
@setup_logging.connect
def setup_logging(**_kwargs):
dictConfig(celery_log_config)

注意,我还用DEFAULT_PROCESS_LOG_FMTrecord.processName == 'MainProcess'进行了筛选,否则我们将丢失日志。这些是我可以从我的Celery容器中获得的唯一日志,但我可以很容易地想象在其他用例中也需要record.processName != 'MainProcess' and record.processName.find('Worker') == -1过滤器。

最新更新