Django Celery:计时任务没有运行



在Django应用程序中,我有一个表单,可以安排发送电子邮件。它有四个字段:名称、电子邮件、正文、发送日期。我想动态创建一个Celery任务(电子邮件(,以便在指定时间运行另一个Celary任务。

我已经能够使用以下代码根据表格定期(每30秒(发送电子邮件:

schedule, _ = IntervalSchedule.objects.update_or_create(every=30, period=IntervalSchedule.SECONDS)
@shared_task(name="schedule_interval_email")
def schedule_email_interval(name, email, body, send_date):
PeriodicTask.objects.update_or_create(
defaults={
"interval": schedule,
"task": "email_task"
},
name="Send message at interval",
args=json.dumps(['name', 'test@test.com', 'body']),
)

然而,当我尝试通过ClockedSchedule将任务安排在特定时间(比当前时间晚3分钟(运行时,Celery beat会记录任务并保存所有相关设置。该任务在Django管理区域中显示为活动状态。然而,电子邮件从未真正发送过。

clocked = ClockedSchedule.objects.create(clocked_time=datetime.now() + timedelta(minutes=3))
@shared_task(name="schedule_clock_email")
def schedule_email_clocked(name, email, body, send_date):
PeriodicTask.objects.create(
clocked=clocked,
name="Send message at specific date/time",
task="email_task",
one_off=True,
args=json.dumps(['name', 'test@test.com', 'body']),
)

我最终想要根据用户在表单中输入的日期时间动态设置计时字段,因此当前代码只是试图测试Celery的工作方式。不过,我想我错过了一些关于它是如何工作的。任何想法都将不胜感激。

我想是时区。

如果您的配置没有生效。默认情况下,芹菜数据库存储UTC时间。

您可以通过执行以下代码来验证这一点。

Django的shell环境

import datetime
import json
from ProjectName.celerytest import test
execution_time = datetime.datetime.utcnow() + datetime.timedelta(minutes=2)
schedule, created = ClockedSchedule.objects.get_or_create(clocked_time=execution_time)

PeriodicTask.objects.create(clocked=schedule,one_off=True,name="test input",task="projectName.celerytest.test",args=json.dumps(['112233']))

最新更新