当我将 Django Celery apply_async 与 eta 一起使用时,它会立即完成这项工作



我查看了芹菜文档并从中尝试了一些东西,但它不像示例那样工作。 也许我在某个时候错了,如果我对以下代码有误,请给我一些指针

在 views.py 我有这样的东西:

class Something(CreateView):
model = something
def form_valid(self, form):
obj = form.save(commit=False)
number = 5
test_limit = datetime.now() + timedelta(minutes=5)
testing_something.apply_async((obj, number), eta=test_limit)
obj.save()

在芹菜任务中,我写了这样的东西:

@shared_task()
def add_number(obj, number):
base = Base.objects.get(id=1)
base.add = base.number + number
base.save()
return obj

我对这段代码的条件是芹菜在 CreateView 运行后立即运行,我的目标是在运行 Something CreateView 后每 5 分钟运行一次任务add_number。非常感谢

编辑:

  1. 我尝试将eta更改为countdown=180但它仍然立即add_number运行功能。 我也尝试了更长的倒计时,但仍然立即运行
  2. 我已经尝试了@johnmoustafis答案,但仍然相同,任务立即运行
  3. 我也尝试过@dana答案,但它仍然是一样的,任务立即运行

默认情况下,芹菜使用 UTC 时间。
如果您的时区"落后于"UTC (UTC - hh:mm(,则datetime.now()调用将返回"落后于"UTC的时间戳,从而导致您的任务立即执行。

您可以改用datetime.utcnow()

test_limit = datetime.utcnow() + timedelta(minutes=5)

既然你使用的是 django,还有另一种选择:

如果你在setting.py中设置了USE_TZ = True,你已经启用了 django 时区设置,你可以使用timezone.now()而不是datetime.utcnow()

from django.utils import timezone
...
test_limit = timezone.now() + timedelta(minutes=5)

'test_limit' 变量没有时区信息。所以芹菜会将 eta param 理解为 UTC 时间。

请使用修改后的代码:

class Something(CreateView):
model = something
def form_valid(self, form):
obj = form.save(commit=False)
number = 5
test_limit = datetime.now()
test_limit = test_limit.replace(tzinfo=tz.tzlocal())
test_limit = test_limit + timedelta(minutes=5)
testing_something.apply_async((obj, number), eta=test_limit)
obj.save()

您可能具有CELERY_ALWAYS_EAGER=True设置。

您能否也发布您的配置和您正在使用的芹菜版本?

在这里,您可能会找到一些有用的信息。

我在使用 celery版本 5.1.0 时遇到了同样的问题,我知道 celery 配置"CELERY_ALWAYS_EAGER"名称已在 4.0+ 版中更改为"CELERY_TASK_ALWAYS_EAGER"。

因此,如果您使用的是芹菜版本4.0+,请确保您已设置CELERY_TASK_ALWAYS_EAGER=False

使用apply_async方法的芹菜任务应在 eta 或倒计时中以指定的延迟执行,并且两者都应根据apply_async定义工作

def apply_async(self, args=None, kwargs=None, task_id=None, producer=None,
link=None, link_error=None, shadow=None, **options):
"""Apply tasks asynchronously by sending a message.
Arguments:
args (Tuple): The positional arguments to pass on to the task.
kwargs (Dict): The keyword arguments to pass on to the task.
countdown (float): Number of seconds into the future that the
task should execute.  Defaults to immediate execution.
eta (~datetime.datetime): Absolute time and date of when the task
should be executed.  May not be specified if `countdown`
is also supplied.
expires (float, ~datetime.datetime): Datetime or
seconds in the future for the task should expire.
The task won't be executed after the expiration time.

以下代码对我来说效果很好。

项目/任务.py

@app.task
def delete_item(item_pk):
try:
item = Item.objects.get(pk=item_pk)
item.delete()
except ObjectDoesNotExist:
logging.warning(f"Cannot find item with id: {item_pk}.")

现在,您可以通过以下方式在逻辑中调用此函数:

...
delete_item.apply_async((item.pk,), countdown=60)  # execute after 1 minute
...

(或(

from datetime import datetime, timedelta
...
eta = datetime.now() + timedelta(seconds=60)
delete_item.apply_async((item.pk,), eta=eta)  # execute after 1 minute
...

相关内容

  • 没有找到相关文章

最新更新