因此,我正在创建一个监视器应用程序,该应用程序会将芹菜任务中的日志发送到 ELK 堆栈。
到目前为止,我已经做到了这一点:
from project.celery import app
def monitor(app):
state = app.events.State()
def on_event_success(event):
state.event(event)
task = state.tasks.get(event['uuid'])
if task.name:
task_name = task.name
task_origin = task.origin
task_type = task.type
task_worker = task.worker
task_info = task.info()
task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}nn".format(task_name, task_origin, task_type, task_worker, task_info['args'])
print "SUCCESS: {}".format(task_log)
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_event_success
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
application = app
monitor(app)
使用此代码,我能够捕获任务中几乎所有可用的信息,但是我没有设法找到一种方法来捕获哪个队列生成了任务执行。
我有两个队列:
CELERY_QUEUES = (
# Persistent task queue
Queue('celery', routing_key='celery'),
# Persistent routine task queue
Queue('routine', routing_key='routine')
)
我想知道哪个队列发起了我的任务执行,从从事件创建的任务对象中获取此信息。
为此,您需要启用任务发送事件。
您还需要为task-sent
事件实现一个处理程序,就像您对task-succeeded
所做的那样。
监视应用程序应至少保留所有捕获的任务发送事件的任务 ID(event["uuid"]
(和路由键 (event["routing_key"]
。我使用缓存工具中的 TTLCache 执行此操作,当我需要路由键信息时,我使用任务成功和任务失败事件处理程序中的此字典。
如果你想要一个示例的任务名称和参数,你需要像我上面描述的一样处理task-received
事件......
您可能想知道为什么我使用 TTLCache - 我们的 Celery 集群每天运行数百万个任务,将所有任务发送的事件数据保存在内存中很快就会占用所有可用内存。
最后,下面是缓存任务发送的数据并在任务成功事件处理程序中使用它的代码:
from cachetools import TTLCache
from project.celery import app
def monitor(app):
state = app.events.State()
# keep a couple of days of history in case not acknowledged tasks are retried
task_info = TTLCache(float('inf'), 3.2 * 24 * 60 * 60)
def on_event_success(event):
nonlocal task_info
state.event(event)
task = state.tasks.get(event['uuid'])
if task.name:
task_name = task.name
task_origin = task.origin
task_type = task.type
task_worker = task.worker
t_info = task.info()
task_log = "TASK NAME {}, TASK ORIGIN {}, TASK TYPE {}, TASK WORKER {}, TASK ARGS {}".format(task_name, task_$
print("SUCCESS: {}".format(task_log))
if event["uuid"] in task_info:
cached_task = task_info[event["uuid"]]
if "routing_key" in cached_task:
print(" routing_key: {}nn".format(cached_task["routing_key"]))
def on_task_sent(event):
# task-sent(uuid, name, args, kwargs, retries, eta, expires, queue, exchange,
# routing_key, root_id, parent_id)
nonlocal task_info
if event["uuid"] not in task_info:
task_info[event["uuid"]] = {"name": event["name"],
"args": event["args"],
"queue": event["queue"],
"routing_key": event["routing_key"]}
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-succeeded': on_event_success,
"task-sent": on_task_sent,
"*": state.event
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
application = app
monitor(app)
我从来没有足够的时间去调查芹菜的芹菜.events.state.State class。我知道它使用 LRUCache 来缓存一些条目,但我不确定是否可以使用它来代替我在代码中使用的 TTLCache......