Celery Beat:一次只能有一个任务实例



我有芹菜拍和芹菜(四个工人)来批量做一些加工步骤。其中一项任务大致如下:"对于每个没有创建Y的X,创建一个Y。">

该任务以半快速的速率(10秒)定期运行。任务完成得很快。还有其他任务正在进行中。

我曾多次遇到节拍任务明显积压的问题,因此同一任务(来自不同节拍)同时执行,导致错误地重复工作。任务的执行似乎也不正常。

  1. 是否可以限制芹菜节拍以确保一次任务只有一个未完成的实例?在任务上设置类似rate_limit=5的内容是"正确"的方法吗?

  2. 是否可以确保beat任务按顺序执行,例如,beat将任务添加到任务链中,而不是调度任务?

  3. 除了让这些任务本身以原子方式执行并且可以安全地同时执行之外,处理这一问题的最佳方法是什么?这不是我所期望的对节拍任务的限制…

任务本身的定义很天真:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return

这是一个实际的(已清理的)日志:

  • [00:00.000]foocorp.tasks.add_y_to_xs已发送。id->#1
  • [00:00.001]收到的任务:foocorp.tasks.add_y_to_xs[#1]
  • 已发送[00:10.009]foocorp.tasks.add_y_to_xs。id->#2
  • 已发送[00:20.024]foocorp.tasks.add_y_to_xs。id->#3
  • [00:26.747]收到的任务:foocorp.tasks.add_y_to_xs[#2]
  • [00:26.748]任务池:应用#2
  • [00:26.752]收到的任务:foocorp.tasks.add_y_to_xs[#3]
  • 接受[00:26.769]任务:foocorp.tasks.add_y_to_xs[#2]pid:26528
  • [00:26.775]任务foocorp.tasks.add_y_to_xs[#2]成功0.0197986490093s:无
  • [00:26.806]任务池:应用#1
  • [00:26.836]任务池:应用#3
  • [01:30.020]接受任务:foocorp.tasks.add_y_to_xs[#1]pid:26526
  • [01:30.053]接受任务:foocorp.tasks.add_y_to_xs[#3]pid:26529
  • [01:30.055]foocorp.tasks.add_y_to_xs[#1]:为X id#9725添加y
  • [01:30.070]foocorp.tasks.add_y_to_xs[#3]:为X id#9725添加y
  • [01:30.074]任务foocorp.tasks.add_y_to_xs[#1]成功0.0594762689434s:None
  • [01:30.087]任务foocorp.tasks.add_y_to_xs[#3]成功0.0352867960464s:无

我们目前使用Celery 3.1.4和RabbitMQ作为传输。

编辑丹,这是我想到的:

丹,这是我最终使用的:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager

def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True

def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()

class DatabaseLockFailed(Exception):
pass

@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield

以及芹菜任务装饰器(仅用于周期性任务):

from functools import wraps
from preo.extensions import db

def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task
from functools import wraps
from celery import shared_task

def skip_if_running(f):
task_name = f'{f.__module__}.{f.__name__}'
@wraps(f)
def wrapped(self, *args, **kwargs):
workers = self.app.control.inspect().active()
for worker, tasks in workers.items():
for task in tasks:
if (task_name == task['name'] and
tuple(args) == tuple(task['args']) and
kwargs == task['kwargs'] and
self.request.id != task['id']):
print(f'task {task_name} ({args}, {kwargs}) is running on {worker}, skipping')
return None
return f(self, *args, **kwargs)
return wrapped

@shared_task(bind=True)
@skip_if_running
def test_single_task(self):
pass

test_single_task.delay()

实现这一点的唯一方法是自己实现锁定策略:

阅读此处的章节以获取参考。

与cron一样,如果第一个任务没有在下一个之前完成。如果这是一个问题,您应该使用锁定策略,以确保一次只能运行一个实例(请参阅例如确保一个任务一次只执行一个)。

我曾经使用celeron解决了这个问题,并将其扩展到了celeron。

两者都适用于您的问题。它使用Redis来锁定正在运行的任务。celery-one还将跟踪正在锁定的任务。

下面是芹菜节拍的一个非常简单的用法示例。在下面的代码中,slow_task每1秒调度一次,但它的完成时间是5秒。正常的芹菜会安排每秒钟的任务,即使它已经在运行。CCD_ 21将阻止这种情况。

celery = Celery('test')
celery.conf.ONE_REDIS_URL = REDIS_URL
celery.conf.ONE_DEFAULT_TIMEOUT = 60 * 60
celery.conf.BROKER_URL = REDIS_URL
celery.conf.CELERY_RESULT_BACKEND = REDIS_URL
from datetime import timedelta
celery.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.slow_task',
'schedule': timedelta(seconds=1),
'args': (1,)
},
}
celery.conf.CELERY_TIMEZONE = 'UTC'

@celery.task(base=QueueOne, one_options={'fail': False})
def slow_task(a):
print("Running")
sleep(5)
return "Done " + str(a)

我尝试编写一个装饰器来使用Postgres咨询锁定,类似于erydo在评论中提到的。

它不是很漂亮,但似乎工作正常。这是Python 2.7下的SQLAlchemy 0.9.7。

from functools import wraps
from sqlalchemy import select, func
from my_db_module import Session # SQLAlchemy ORM scoped_session
def pg_locked(key):
def decorator(f):
@wraps(f)
def wrapped(*args, **kw):
session = db.Session()
try:
acquired, = session.execute(select([func.pg_try_advisory_lock(key)])).fetchone()
if acquired:
return f(*args, **kw)
finally:
if acquired:
session.execute(select([func.pg_advisory_unlock(key)]))
return wrapped
return decorator
@app.task
@pg_locked(0xdeadbeef)
def singleton_task():
# only 1x this task can run at a time
pass

(欢迎对改进方法发表任何意见!)

需要一个分布式锁定系统,因为这些Celery beat实例本质上是不同的进程,可能跨不同的主机。

ZooKeeper和etcd等中心坐标系统适用于分布式锁定系统的实现。

我建议使用etcd,它重量轻,速度快。有几种锁定etcd的实现方式,例如:

python etcd锁定

相关内容

  • 没有找到相关文章

最新更新