如何动态添加/删除定期任务到芹菜(芹菜节拍)



如果我有一个定义如下的函数:

def add(x,y):
  return x+y

有没有办法将这个函数动态添加为芹菜周期任务并在运行时启动它?我希望能够做类似的事情(伪代码):

some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)

我还想使用类似(伪代码)的东西动态停止或删除该任务:

celery.beat.remove_task(some_unique_task_id)

celery.beat.stop(some_unique_task_id)

仅供参考,我没有使用 djcelery,它可以让您通过 django 管理员管理定期任务。

这个问题在谷歌群组上得到了回答。

我不是作者,所有的功劳都归于让·马克

这是一个适当的解决方案。确认工作,在我的场景中, 我子类化了定期任务并从中创建了一个模型,因为我可以 根据需要向模型添加其他字段,以便我可以添加 "终止"方法。您必须设置启用定期任务 属性,并在删除之前将其保存。整体 子类化不是必须的,schedule_every方法是 真的做了工作。当您准备好终止任务时(如果您 没有子类它)你可以只使用 PeriodicTask.objects.filter(name=...) 要搜索您的任务,请禁用 它,然后删除它。

希望这有帮助!

from djcelery.models import PeriodicTask, IntervalSchedule
from datetime import datetime
class TaskScheduler(models.Model):
    periodic_task = models.ForeignKey(PeriodicTask)
    @staticmethod
    def schedule_every(task_name, period, every, args=None, kwargs=None):
    """ schedules a task by name every "every" "period". So an example call would be:
         TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) 
         that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. 
    """
        permissible_periods = ['days', 'hours', 'minutes', 'seconds']
        if period not in permissible_periods:
            raise Exception('Invalid period specified')
        # create the periodic task and the interval
        ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task
        interval_schedules = IntervalSchedule.objects.filter(period=period, every=every)
        if interval_schedules: # just check if interval schedules exist like that already and reuse em
            interval_schedule = interval_schedules[0]
        else: # create a brand new interval schedule
            interval_schedule = IntervalSchedule()
            interval_schedule.every = every # should check to make sure this is a positive int
            interval_schedule.period = period 
            interval_schedule.save()
        ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule)
        if args:
            ptask.args = args
        if kwargs:
            ptask.kwargs = kwargs
        ptask.save()
        return TaskScheduler.objects.create(periodic_task=ptask)
    def stop(self):
        """pauses the task"""
        ptask = self.periodic_task
        ptask.enabled = False
        ptask.save()
    def start(self):
        """starts the task"""
        ptask = self.periodic_task
        ptask.enabled = True
        ptask.save()
    def terminate(self):
        self.stop()
        ptask = self.periodic_task
        self.delete()
        ptask.delete()

这最终是通过芹菜 v4.1.0 中包含的修复来实现的。现在,你只需要在数据库后端更改调度条目,芹菜节拍就会根据新的调度来行动。

文档模糊地描述了它是如何工作的。芹菜节拍的默认调度程序 PersistentScheduler 使用搁置文件作为其调度数据库。对PersistentScheduler实例中beat_schedule字典所做的任何更改都将与此数据库同步(默认情况下,每 3 分钟同步一次),反之亦然。这些文档介绍了如何使用 app.add_periodic_taskbeat_schedule添加新条目。要修改现有条目,只需添加一个具有相同name的新条目。像从字典中删除条目一样:del app.conf.beat_schedule['name']

假设您想使用外部应用程序监控和修改芹菜节拍时间表。然后,您有几种选择:

  1. 您可以像字典一样open搁置数据库文件并读取其内容。写回此文件进行修改。
  2. 可以运行 Celery 应用的另一个实例,并使用该实例修改搁置文件,如上所述。
  3. 你可以使用 django-celery-beat 中的自定义调度程序类将调度存储在 django 管理的数据库中,并访问那里的条目。
  4. 您可以使用 celerybeat-mongo 中的调度程序将调度存储在 MongoDB 后端中,并访问那里的条目。

不,对不起,这在常规的芹菜节拍中是不可能的。

但是做你想做的事很容易扩展,例如django-celery。调度程序只是一个子类,读取调度并将其写入数据库(顶部有一些优化)。

你也可以使用 django-celery 调度器,即使是非 Django 项目。

像这样:

    安装 django
  • + django-celery:

    $ pip install -U django django-celery

  • 将以下设置添加到您的芹菜配置:

    DATABASES = {
        'default': {
            'NAME': 'celerybeat.db',
            'ENGINE': 'django.db.backends.sqlite3',
        },
    }
    INSTALLED_APPS = ('djcelery', )
    
  • 创建数据库表:

    $ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
    
  • 使用数据库调度程序启动芹菜节拍:

    $ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig 
        -S djcelery.schedulers.DatabaseScheduler
    

还有djcelerymon命令,可用于非Django项目要在同一进程中启动 celerycam 和 Django 管理网络服务器,您可以使用它还可以在一个漂亮的 Web 界面中编辑您的定期任务:

   $ djcelerymon

(注意由于某种原因,djcelerymon无法使用Ctrl + C停止,您必须使用 Ctrl+Z + kill %1)

有一个名为django-celery-beat的库,它提供了人们需要的模型。要使其动态加载新的定期任务,必须创建自己的调度程序。

from django_celery_beat.schedulers import DatabaseScheduler

class AutoUpdateScheduler(DatabaseScheduler):
    def tick(self, *args, **kwargs):
        if self.schedule_changed():
            print('resetting heap')
            self.sync()
            self._heap = None
            new_schedule = self.all_as_schedule()
            if new_schedule:
                to_add = new_schedule.keys() - self.schedule.keys()
                to_remove = self.schedule.keys() - new_schedule.keys()
                for key in to_add:
                    self.schedule[key] = new_schedule[key]
                for key in to_remove:
                    del self.schedule[key]
        super(AutoUpdateScheduler, self).tick(*args, **kwargs)
    @property
    def schedule(self):
        if not self._initial_read and not self._schedule:
            self._initial_read = True
            self._schedule = self.all_as_schedule()
        return self._schedule

您可以查看这个 flask-djcelery,它配置了 flask 和 djcelery,还提供了可浏览的 rest api

我一直在为Celery + Redis寻找可以灵活添加/删除的相同解决方案。看看这个,redbeat,来自Heroku的同一个家伙,甚至他们也放了Redis + Sentinel。

希望帮助:)

@asksol的答案是 Django 应用程序中需要什么。

对于非 django 应用程序,您可以使用 celery-sqlalchemy-scheduler 它像 Django 的 django-celery-beat 一样建模,因为它也使用数据库而不是文件celerybeat-schedule

  • https://pypi.org/project/celery-sqlalchemy-scheduler/
  • https://github.com/AngelLiang/celery-sqlalchemy-scheduler

下面是运行时添加新任务的示例。

tasks.py

from celery import Celery
celery = Celery('tasks')
beat_dburi = 'sqlite:///schedule.db'
celery.conf.update(
    {'beat_dburi': beat_dburi}
)

@celery.task
def my_task(arg1, arg2, be_careful):
    print(f"{arg1} {arg2} be_careful {be_careful}")

日志(生产者)

$ celery --app=tasks beat --scheduler=celery_sqlalchemy_scheduler.schedulers:DatabaseScheduler --loglevel=INFO
celery beat v5.1.2 (sun-harmonics) is starting.
[2021-08-20 15:20:20,927: INFO/MainProcess] beat: Starting...

日志(使用者)

$ celery --app=tasks worker --queues=celery --loglevel=INFO
-------------- celery@ubuntu20 v5.1.2 (sun-harmonics)
[2021-08-20 15:20:02,287: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//

数据库计划

$ sqlite3 schedule.db 
sqlite> .databases
main: /home/nponcian/Documents/Program/1/db/schedule.db
sqlite> .tables
celery_crontab_schedule       celery_periodic_task_changed
celery_interval_schedule      celery_solar_schedule       
celery_periodic_task        
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|

现在,当这些工作线程已在运行时,让我们通过添加新的计划任务来更新计划。请注意,这是在运行时进行的,无需重新启动工作线程。

$ python3
>>> # Setup the session.
>>> from celery_sqlalchemy_scheduler.models import PeriodicTask, IntervalSchedule
>>> from celery_sqlalchemy_scheduler.session import SessionManager
>>> from tasks import beat_dburi
>>> session_manager = SessionManager()
>>> engine, Session = session_manager.create_session(beat_dburi)
>>> session = Session()
>>> 
>>> # Setup the schedule (executes every 10 seconds).
>>> schedule = session.query(IntervalSchedule).filter_by(every=10, period=IntervalSchedule.SECONDS).first()
>>> if not schedule:
...     schedule = IntervalSchedule(every=10, period=IntervalSchedule.SECONDS)
...     session.add(schedule)
...     session.commit()
... 
>>> 
>>> # Create the periodic task
>>> import json
>>> periodic_task = PeriodicTask(
...     interval=schedule,                  # we created this above.
...     name='My task',                     # simply describes this periodic task.
...     task='tasks.my_task',               # name of task.
...     args=json.dumps(['arg1', 'arg2']),
...     kwargs=json.dumps({
...        'be_careful': True,
...     }),
... )
>>> session.add(periodic_task)
>>> session.commit()

数据库计划(已更新)

  • 现在我们可以看到,新添加的计划已反映到由芹菜节拍计划程序连续读取的数据库中。因此,如果 args 或 kwarg 的值有任何更新,我们可以轻松地对数据库执行 SQL 更新,并且它应该与正在运行的工作线程实时反映(无需重新启动)。
sqlite> select * from celery_periodic_task;
1|celery.backend_cleanup|celery.backend_cleanup||1||[]|{}|||||2021-08-20 19:20:20.955246|0||1||0|2021-08-20 07:20:20|
2|My task|tasks.my_task|1|||["arg1", "arg2"]|{"be_careful": true}||||||0||1||0|2021-08-20 07:26:49|

日志(生产者)

  • 现在,新任务每 10 秒排队一次
[2021-08-20 15:26:51,768: INFO/MainProcess] DatabaseScheduler: Schedule changed.
[2021-08-20 15:26:51,768: INFO/MainProcess] Writing entries...
[2021-08-20 15:27:01,789: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:11,776: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)
[2021-08-20 15:27:21,791: INFO/MainProcess] Scheduler: Sending due task My task (tasks.my_task)

日志(使用者)

  • 新添加的任务每 10 秒按时正确执行一次
[2021-08-20 15:27:01,797: INFO/MainProcess] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] received
[2021-08-20 15:27:01,798: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:01,799: WARNING/ForkPoolWorker-4] 
[2021-08-20 15:27:01,799: INFO/ForkPoolWorker-4] Task tasks.my_task[04dcb40c-0a77-437b-a129-57eb52850a51] succeeded in 0.000763321000704309s: None
[2021-08-20 15:27:11,783: INFO/MainProcess] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] received
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:11,786: WARNING/ForkPoolWorker-4] 
[2021-08-20 15:27:11,787: INFO/ForkPoolWorker-4] Task tasks.my_task[e8370a6b-085f-4bd5-b7ad-8f85f4b61908] succeeded in 0.0006725780003762338s: None
[2021-08-20 15:27:21,797: INFO/MainProcess] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] received
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] arg1 arg2 be_careful True
[2021-08-20 15:27:21,799: WARNING/ForkPoolWorker-4] 
[2021-08-20 15:27:21,800: INFO/ForkPoolWorker-4] Task tasks.my_task[c14d875d-7f6c-45c2-a76b-4e9483273185] succeeded in 0.0006371149993356084s: None

前段时间我需要动态更新 Celery 和 Django 中的周期性任务,我写了一篇关于我的方法的文章(文章代码)。

我使用的是django-celery-beat包。它为PeriodicTaskIntervalSchedule提供了数据库模型。通过操作PeriodicTask对象,您可以在 Celery 中添加/删除/更新/暂停定期任务。

创建定期任务

from django_celery_beat.models import IntervalSchedule, PeriodicTask
schedule, created = IntervalSchedule.objects.get_or_create(
    every=instance.interval,
    period=IntervalSchedule.SECONDS,
)
task = PeriodicTask.objects.create(
    interval=schedule,
    name=f"Monitor: {instance.endpoint}",
    task="monitors.tasks.task_monitor",
    kwargs=json.dumps(
        {
            "monitor_id": instance.id,
        }
    ),
)

删除定期任务

PeriodicTask.objects.get(pk=task_id).delete()

更改定期任务中的间隔

task = PeriodicTask.objects.get(pk=your_id)
schedule, created = IntervalSchedule.objects.get_or_create(
    every=new_interval,
    period=IntervalSchedule.SECONDS,
)
task.interval = schedule
task.save()

暂停定期任务

task = PeriodicTask.objects.get(pk=your_id)
task.enabled = false
task.save()

节拍服务

使用 django-celery-beat 时,您需要在启动节拍服务时传递调度程序参数:

celery -A backend beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler --max-interval 10

Celery 可以实现数据库和调用自身的动态周期任务。

但是APSchdule更好。

因为动态周期性任务总是意味着长时间的倒计时或预计时间。这些定期任务中的太多会占用大量内存,因此重新启动和执行非延迟任务非常耗时。

tasks.py

import sqlite3
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery(
    'tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1',
    imports=['tasks'],
)
conn = sqlite3.connect('database.db', check_same_thread=False)
c = conn.cursor()
sql = '''
CREATE TABLE IF NOT EXISTS `tasks` 
(
   `id` INTEGER UNIQUE PRIMARY KEY AUTOINCREMENT,
   `name` TEXT,
   `countdown` INTEGER
);
'''
c.execute(sql)

def create(name='job', countdown=5):
    sql = 'INSERT INTO `tasks` (`name`, `countdown`) VALUES (?, ?)'
    c.execute(sql, (name, countdown))
    conn.commit()
    return c.lastrowid

def read(id=None, verbose=False):
    sql = 'SELECT * FROM `tasks` '
    if id:
        sql = 'SELECT * FROM `tasks` WHERE `id`={}'.format(id)
    all_rows = c.execute(sql).fetchall()
    if verbose:
        print(all_rows)
    return all_rows

def update(id, countdown):
    sql = 'UPDATE `tasks` SET `countdown`=? WHERE `id`=?'
    c.execute(sql, (countdown, id))
    conn.commit()

def delete(id, verbose=False):
    sql = 'DELETE FROM `tasks` WHERE `id`=?'
    affected_rows = c.execute(sql, (id,)).rowcount
    if verbose:
        print('deleted {} rows'.format(affected_rows))
    conn.commit()

@app.task
def job(id):
    id = read(id)
    if id:
        id, name, countdown = id[0]
    else:
        logger.info('stop')
        return
    logger.warning('id={}'.format(id))
    logger.warning('name={}'.format(name))
    logger.warning('countdown={}'.format(countdown))
    job.apply_async(args=(id,), countdown=countdown)

main.py

from tasks import *
id = create(name='job', countdown=5)
job(id)
# job.apply_async((id,), countdown=5)  # wait 5s
print(read())
input('enter to update')
update(id, countdown=1)
input('enter to delete')
delete(id, verbose=True)

相关内容

  • 没有找到相关文章