目前我有一个芹菜批正在运行django,比如:
Celery.py:
from __future__ import absolute_import, unicode_literals
import os
import celery
from celery import Celery
from celery.schedules import crontab
import django
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(__file__)), '.env'))
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'base.settings')
django.setup()
app = Celery('base')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
app.control.purge()
sender.add_periodic_task(30.0, check_loop.s())
recursion_function.delay() #need to use recursive because it need to wait for loop to finish(time can't be predict)
print("setup_periodic_tasks")
@app.task()
def check_loop():
.....
start = database start number
end = database end number
callling apis in a list from id=start to id=end
create objects
update database(start number = end, end number = end + 3)
....
@app.task()
def recursion_function(default_retry_delay=10):
.....
do some looping
....
#when finished, call itself again
recursion_function.apply_async(countdown=30)
我的目标是,每当芹菜文件被编辑时,它都会重新启动所有尚未执行的任务-删除排队的任务(我这样做是因为recursion_function
在完成它检查数据库中表的每个记录的工作后会再次运行它自己,所以我不担心它中途停止(。
check_loop
函数将调用一个具有分页功能的api来返回对象列表,我将按表中的记录进行比较,如果匹配,则创建另一个模型的新自定义记录
我的问题是,当我清除所有消息时,当前正在运行的任务会中途停止还是继续运行?因为如果check_loop
函数在api列表的中途停止循环,那么它将再次运行循环,我将创建新的重复记录,而我不想要
示例:
在check_loop()
的调整任务中,它中途创建了对象(在从元素id=2到id=5的api列表上(,服务器重新启动->再次运行,现在check_loop()
从头开始运行(在从元件id=2到id=5的api名单上(,并再次从该列表创建对象(我100%不想要(
它是这样运行的吗?我只需要一个确认
编辑:
https://docs.celeryproject.org/en/4.4.1/faq.html#how-do-i-purge-all-waiting-tasks
我添加了app.control.purge()
,因为当我重新启动时,recursion_function
在setup_periodic_tasks
中再次被调用,而recursion_function.apply_async(countdown=30)
中的前一个recursion_function
也执行,所以它自己乘以
是,除非工作程序也重新启动,否则工作程序将继续执行当前正在运行的任务。
此外,Celery方式总是期望任务在并发环境中运行,并考虑以下因素:
- 有许多任务同时运行
- 有许多芹菜工人在执行任务
- 相同的任务可能会再次运行
- 同一任务的多个实例可能同时运行
- 任何任务都可以随时终止
即使您确信在您的环境中只有一个工作程序手动启动/停止,并且这些都不适用-任务的创建方式应该允许所有这些事情发生。
一些有用的技术:
- 使用数据库事务
- 使用锁定
- 将长时间运行的任务拆分为更快的任务
- 如果任务有要保存的中间值,或者它们很重要(例如,像某些api调用一样不可复制(,并且下一步处理这些值需要时间,则考虑将其拆分为几个链式任务
如果您一次只需要运行一个任务的实例-使用某种锁定-在数据库或缓存中创建/更新锁定记录,以便其他任务(相同的任务(可以检查并知道此任务正在运行,只需返回或等待前一个任务完成。
即recursion_function
也可以是Periodic Task
。作为周期性任务将确保它在每个间隔运行,即使前一个任务由于任何原因失败(因此无法像常规非周期性任务那样再次排队(。通过锁定,您可以确保一次只运行一个。
check_loop()
:
首先,建议将结果保存在数据库中的一个事务中,以确保在数据库中保存或不修改所有结果。
您还可以保存一些标记,指示已保存对象的数量/状态,这样以后的任务就可以只检查该标记,而不是每个对象。
或者,在创建每个元素之前,以某种方式对其进行检查,确认其已存在于数据库中。
我不会写一篇像奥列格上面的优秀文章那样的文章。答案很简单-所有正在运行的任务都将继续运行。purge
是关于队列中等待Celery工作人员挑选的所有任务。