如何使用 Celery 安排在每月特定日期运行的任务



我尝试使用此解决方案,但没有成功。如何指定应在每月执行任务的日期?我的解决方案是

class DayOfMonth(schedule):
    def __init__(self, day=1):
        self.day = day
    def is_due(self, last_run_at):
        now = datetime.datetime.now()
        if now.month != last_run_at.month and now.day == self.day:
            return True, 3000
        return False, 3000
    def __eq__(self, other):
            if isinstance(other, DayOfMonth):
                return self.day == other.day and self.month == other.month
            return False

我尝试使用 django-celery 运行它,但我仍然收到未指定run_every的错误。

编辑 1:

运行我的任务添加:

"my_task": {
        "task": "util.tasks.CeleryManagementCommand",
        "schedule": DayOfMonth(day=4),
        "args": ('my_task',),
    },

CELERYBEAT_SHEDULE字典

编辑2:

当我在 init 中指定run_every时> self.run_every = None 收到一个错误,即 None 类型对象没有属性total_seconds

如果你子类,并更改 init,你最好确保调用父级的 init。我很确定这将解决您的问题:

class DayOfMonth(schedule):
    def __init__(self, day=1, *args, **kwargs):
        super(DayOfMonth, self).__init__(*args, **kwargs)
        self.day = day

如果您检查这一点,它将揭示您遇到的错误:https://github.com/ask/celery/blob/master/celery/schedules.py#L33

最好将其包含在 celery.py 文件中:

"schedule": corntab(day_of_month=4),

同时导入玉米片:

from celery.schedules import crontab

相关内容

  • 没有找到相关文章

最新更新