在 Django 视图中使用 Celery 定期任务输出,这些视图使用 django_celery_beat 和 Cac



我正在尝试使用Celery在我的一个模型上执行一个相当消耗的算法。 目前在我的 home.tasks.py 我有:

@shared_task(bind=True)
def get_hot_posts():
return Post.objects.get_hot()


@shared_task(bind=True)
def get_top_posts():
pass

在我的Post对象模型管理器中,我有:

def get_hot(self):

qs = (
self.get_queryset()
.select_related("author")
)

qs_list = list(qs)
sorted_post = sorted(qs_list, key=lambda p: p.hot(), reverse=True)

return sorted_post

返回热帖的列表对象。

我使用django_celery_beat来设置定期任务。我在settings.py中配置了

CELERY_BEAT_SCHEDULE = {
'update-hot-posts': {
'task':'get_hot_posts',
'schedule': 3600.0
},
'update-top-posts': {
'task':'get_top_posts',
'schedule': 86400
}
}

我不知道是否可以在 Celery 任务中对我的模型执行任何功能,但我的目的是每 1 小时计算一次热门帖子,然后简单地在我的一个视图中使用它。我如何实现这一点,我无法找到如何获取该任务的输出并在我的视图中使用它以便在我的模板中呈现它。

提前感谢!

编辑

我现在正在缓存结果:

settings.py:

CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://127.0.0.1:6379/1",
"OPTIONS": {
"CLIENT_CLASS": "django_redis.client.DefaultClient",
"IGNORE_EXCEPTIONS": True,

}
}
}

CACHE_TTL = getattr(settings, 'CACHE_TTL', DEFAULT_TIMEOUT(

@shared_task(bind=True)
def get_hot_posts():
hot_posts = Post.objects.get_hot()
cache.set("hot_posts", hot_posts, timeout=CACHE_TTL)

但是,当我访问我视图中的对象时,它返回 None,似乎我的任务不起作用。

@login_required
def hot_posts(request):
posts = cache.get("hot_posts")
context = { 'posts':posts, 'hot_active':'-active'}
return render(request, 'home/homepage/home.html', context)

如何检查我的任务是否正常运行?它实际上正在工作并缓存查询集函数。

编辑settings.py中的配置:

BROKER_URL = 'redis://localhost:6379'
BROKER_TRANSPORT = 'redis'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_BEAT_SCHEDULE = {
'update-hot-posts': {
'task':'get_hot_posts',
'schedule': 3600.0
},
'update-top-posts': {
'task':'get_top_posts',
'schedule': 86400.0
},
'tester': {
'task':'tester',
'schedule': 60.0
}
}

当我转到我的视图并返回 None 时,我没有看到和结果cache.get我认为我的任务没有运行,但我找不到原因。

这是我运行辅助角色时发生的情况:

芹菜 -A 注册工作者 -E --loglevel=info

-------------- celery@apples-MacBook-Pro-2.local v4.4.6 (cliffs)
--- ***** ----- 
-- ******* ---- Darwin-16.7.0-x86_64-i386-64bit 2020-07-06 01:46:36
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         register:0x10f3da050
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost:6379/
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: ON
--- ***** ----- 
-------------- [queues]
.> celery           exchange=celery(direct) key=celery

[tasks]
. home.tasks.get_hot_posts
. home.tasks.get_top_posts
. home.tasks.tester
[2020-07-06 01:46:38,449: INFO/MainProcess] Connected to redis://localhost:6379//
[2020-07-06 01:46:38,500: INFO/MainProcess] mingle: searching for neighbors
[2020-07-06 01:46:39,592: INFO/MainProcess] mingle: all alone
[2020-07-06 01:46:39,650: INFO/MainProcess] celery@apples-MacBook-Pro-2.local ready.

同样对于启动节拍,我使用:

celery -A register beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler

我的建议是你改变你的模型并使其可标记。 也许是这样:https://django-taggit.readthedocs.io/

完成此操作后,您可以修改计算热门帖子的芹菜作业。 计算出新的热门帖子后,您可以从所有现有帖子中删除所有"热门"标签,然后使用"热门"标签标记新热帖子。

然后,您的视图代码可以简单地过滤带有热标签的帖子。

编辑

如果要确保代码实际执行,可以使用扩展来执行此操作。 例如,django-celery-results后端会将你的@shared_task返回的任何数据(通常是JSON,如果这是你的消息编码(以及时间戳甚至输入参数一起存储在数据库中。 因此,您可以查看任务是否按预期运行。

https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#django-celery-results-using-the-django-orm-cache-as-a-result-backend

你也可以考虑django-celery-beat,以确保你有一个很好的可视化方式通过django管理员查看工作时间表。

https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html#django-celery-beat-database-backed-periodic-tasks-with-admin-interface

编辑 2

如果您打算使用数据库调度程序(强烈推荐!(,那么您需要登录到管理员并按照您想要的计划添加任务。

https://pinoylearnpython.com/wp-content/uploads/2019/04/Django-Celery-Beat-on-Admin-Site-Pinoy-Learn-Python-1024x718.jpg

编辑 3

在您的 settings.py

CELERY_BEAT_SCHEDULE = {
'update-hot-posts': {
'task':'get_hot_posts',
'schedule': 3600.0
},
'update-top-posts': {
'task':'get_top_posts',
'schedule': 86400.0
},
'tester': {
'task':'tester',
'schedule': 60.0
}
}

第三个任务称为tester,应该每60秒运行一次。 我在你的任务中看不到这一点。 因为您尝试安排一个未在任何地方定义为@shared_task的任务,所以芹菜变得困惑并为您提供有关tester的错误消息。

相关内容

  • 没有找到相关文章

最新更新