这是在刷新用户的访问令牌的上下文中。假设我们有一个函数refresh_user_token
,它将CustomUser
对象作为user
。
def refresh_user_token(user):
...
...
return result
我希望针对特定CustomUser
每次执行此函数,以在 9 天内安排重复。
def refresh_user_token(user):
...
...
next_refresh = datetime.now() + timedelta(days=9)
schedule_refresh(user, scheduled_time=next_refresh)
return result
我看到的大多数 Celery 用例都是关于执行批处理操作的,但对于这种用途,我需要能够使用参数执行函数,这在 Celery 中似乎不可行。
有人确实建议设置一个 cron 作业来检查任何需要刷新 x 秒的令牌。
所以在CustomUser
对象上,我们有一个名为 last_token_refresh
的DateTimeField
。
@Celery.task
def refresh_auth_tokens():
users = CustomUser.objects.all()
for user in users:
last_refresh_delta = datetime.now(timezone.utc) - user.last_token_refresh
if last_refresh_delta.days >= 9:
refresh_user_token(user)
return True
else:
return False
这可以工作,但是当消息代理可用于仅安排所需的任务时,我觉得这非常费力。
您可以使用 Celery Beat 使用 crontab 来安排芹菜任务。只需创建常规的芹菜任务,并使用 Beat 说出何时运行。
这是我在一个项目中的芹菜节拍设置示例:
CELERY_BEAT_SCHEDULE = {
'populate_controller': {
'task': 'common.tasks.populate_controller',
# Will be executed Mondays, at 08:30
'schedule': crontab(day_of_week=2,
hour=8,
minute=30),
'options': {'queue': 'populate_controller'}
},
}