我查看了芹菜文档并从中尝试了一些东西,但它不像示例那样工作。 也许我在某个时候错了,如果我对以下代码有误,请给我一些指针
在 views.py 我有这样的东西:
class Something(CreateView):
model = something
def form_valid(self, form):
obj = form.save(commit=False)
number = 5
test_limit = datetime.now() + timedelta(minutes=5)
testing_something.apply_async((obj, number), eta=test_limit)
obj.save()
在芹菜任务中,我写了这样的东西:
@shared_task()
def add_number(obj, number):
base = Base.objects.get(id=1)
base.add = base.number + number
base.save()
return obj
我对这段代码的条件是芹菜在 CreateView 运行后立即运行,我的目标是在运行 Something CreateView 后每 5 分钟运行一次任务add_number。非常感谢
编辑:
- 我尝试将
eta
更改为countdown=180
但它仍然立即add_number
运行功能。 我也尝试了更长的倒计时,但仍然立即运行 - 我已经尝试了@johnmoustafis答案,但仍然相同,任务立即运行
- 我也尝试过@dana答案,但它仍然是一样的,任务立即运行
默认情况下,芹菜使用 UTC 时间。
如果您的时区"落后于"UTC (UTC - hh:mm(,则datetime.now()
调用将返回"落后于"UTC的时间戳,从而导致您的任务立即执行。
您可以改用datetime.utcnow()
:
test_limit = datetime.utcnow() + timedelta(minutes=5)
既然你使用的是 django,还有另一种选择:
如果你在setting.py
中设置了USE_TZ = True
,你已经启用了 django 时区设置,你可以使用timezone.now()
而不是datetime.utcnow()
:
from django.utils import timezone
...
test_limit = timezone.now() + timedelta(minutes=5)
'test_limit' 变量没有时区信息。所以芹菜会将 eta param 理解为 UTC 时间。
请使用修改后的代码:
class Something(CreateView):
model = something
def form_valid(self, form):
obj = form.save(commit=False)
number = 5
test_limit = datetime.now()
test_limit = test_limit.replace(tzinfo=tz.tzlocal())
test_limit = test_limit + timedelta(minutes=5)
testing_something.apply_async((obj, number), eta=test_limit)
obj.save()
您可能具有CELERY_ALWAYS_EAGER=True
设置。
您能否也发布您的配置和您正在使用的芹菜版本?
在这里,您可能会找到一些有用的信息。
我在使用 celery版本 5.1.0 时遇到了同样的问题,我知道 celery 配置"CELERY_ALWAYS_EAGER"名称已在 4.0+ 版中更改为"CELERY_TASK_ALWAYS_EAGER"。
因此,如果您使用的是芹菜版本4.0+,请确保您已设置CELERY_TASK_ALWAYS_EAGER=False
使用apply_async方法的芹菜任务应在 eta 或倒计时中以指定的延迟执行,并且两者都应根据apply_async定义工作
def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
"""Apply tasks asynchronously by sending a message.
Arguments:
args (Tuple): The positional arguments to pass on to the task.
kwargs (Dict): The keyword arguments to pass on to the task.
countdown (float): Number of seconds into the future that the
task should execute. Defaults to immediate execution.
eta (~datetime.datetime): Absolute time and date of when the task
should be executed. May not be specified if `countdown`
is also supplied.
expires (float, ~datetime.datetime): Datetime or
seconds in the future for the task should expire.
The task won't be executed after the expiration time.
以下代码对我来说效果很好。
项目/任务.py
@app.task
def delete_item(item_pk):
try:
item = Item.objects.get(pk=item_pk)
item.delete()
except ObjectDoesNotExist:
logging.warning(f"Cannot find item with id: {item_pk}.")
现在,您可以通过以下方式在逻辑中调用此函数:
...
delete_item.apply_async((item.pk,), countdown=60) # execute after 1 minute
...
(或(
from datetime import datetime, timedelta
...
eta = datetime.now() + timedelta(seconds=60)
delete_item.apply_async((item.pk,), eta=eta) # execute after 1 minute
...