如何实现每个工作线程同步以避免连接到 Celery 信号的方法出现争用条件?



我正在利用task_prerun和task_postrun信号来跟踪当时在特定工人身上实际执行的任务数量。

每次进入任务时,我都会将文件中的整数增加 1。当任务离开时,我将其减少一个单位。

我将这些值写入文件。这意味着当两个任务在同一工作线程下同时启动并且task_prerrun信号同时触发并访问同一文件时,我必须考虑竞争条件。

我将如何处理?是否可以在全局范围内生成threading.Lock对象?此锁必须在每个工人的基础上工作,因此我想尽管不是很好的做法,但在全球范围内声明它是可以的。

不想获取正在处理的任务总数,而是要获取该工作人员正在处理的任务数

原因是为了防止实例在 自动扩展组最小大小在 AWS 堆栈中更改...我不要 AWS 杀死仍在处理任务的计算机。

请考虑以下示例:

import os
import time
from celery import Celery
from celery.signals import task_prerun, task_postrun
app = Celery('tasks', broker='pyamqp://guest@localhost/')
# Text file that keeps track of how many tasks are still computing.
counter_file = os.path.join(os.path.dirname(__file__), 'counter.txt')
if not os.path.exists(counter_file):
with open(counter_file, 'w') as f:
f.write('0')
@task_prerun.connect
def before(*args, **kwargs):
""" Open the counter file and increment the value in it. """
with open(counter_file, 'r+') as f:
count = int(f.read())
f.seek(0)
f.write(str(count + 1))
@task_postrun.connect
def after(*args, **kwargs):
""" Open the counter file and decrement the value in it. """
with open(counter_file, 'r+') as f:
count = int(f.read())
f.seek(0)
f.write(str(count - 1))
@app.task
def add(x, y):
time.sleep(5)
return x + y

溶液

我考虑了使用 Inspect 类@DejanLekic提出的解决方案,结果是成功的。这是我使用 2 台机器将其加载到芹菜中的最终脚本:

# tasks.py
import os
import random
import socket
import threading
import time
from celery import Celery
from celery.signals import task_prerun, task_postrun
app = Celery('tasks', broker=os.getenv('BROKER_ADDR', 'pyamqp://guest@localhost//'))
def get_number_of_tasks_being_executed_by_this_worker(wait_before=0.01):
time.sleep(wait_before)
# Do not rely on the worker name, because we are just sure of the hostname, so we
# cannot use the detination= keyword of the inspect call.
active_tasks_by_all_workers = app.control.inspect().active()
# Filter the tasks of the workers on this machine.
active_tasks_by_this_worker = [
val for key, val in active_tasks_by_all_workers.items()
if socket.gethostname() in key
]
# Get the list of tasks of the first (and only, ideally) match.
active_tasks_by_this_worker = active_tasks_by_this_worker[0] if active_tasks_by_this_worker else []
return active_tasks_by_this_worker
def check_if_should_protect_from_autoscaling():
tasks = get_number_of_tasks_being_executed_by_this_worker()
if tasks:
print("%d tasks are still running in this worker. Ensure protection is set." % len(tasks))
# if is_not_protected_against_auto_scaling_group:
#     set_aws_autoscaling_protection()
else:
print("This worker is not executing any tasks. Unsetting protection.")
# unset_aws_autoscaling_protection()
@task_postrun.connect
def after(*args, **kwargs):
# Get the number of tasks with a little delay (0.01 seconds suffice), otherwise at
# this point the current task that executed this method is shown as active.
threading.Thread(target=check_if_should_protect_from_autoscaling).start()
@app.task
def add(x, y):
time.sleep(3 * random.random())
return x + y

我正在从此脚本发送许多任务:

# dispatcher.py
import asyncio
from tasks import add
async def task():
add.delay(3, 4)

async def main():
await asyncio.gather(*[task() for i in range(200)])

if __name__ == '__main__':
asyncio.run(main())

输出日志似乎确认了预期的行为:

[2019-09-23 07:50:28,507: WARNING/ForkPoolWorker-3] 10 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,625: WARNING/ForkPoolWorker-1] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,627: WARNING/ForkPoolWorker-7] 8 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:28,993: WARNING/ForkPoolWorker-4] 7 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,027: INFO/ForkPoolWorker-2] Task tasks.add[c3af9378-5666-42c3-9a37-5d0720b2065a] succeeded in 1.6377690890221857s: 7
[2019-09-23 07:50:29,204: INFO/ForkPoolWorker-9] Task tasks.add[9ca176ce-1590-4670-9947-4656166d224d] succeeded in 2.7913955969852395s: 7
[2019-09-23 07:50:29,224: INFO/ForkPoolWorker-5] Task tasks.add[38d005bc-ff13-4514-aba0-8601e79e67c8] succeeded in 2.0496858750120737s: 7
[2019-09-23 07:50:29,311: WARNING/ForkPoolWorker-8] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,316: WARNING/ForkPoolWorker-6] 5 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:29,510: WARNING/ForkPoolWorker-10] 4 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,059: WARNING/ForkPoolWorker-2] 3 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,199: INFO/ForkPoolWorker-3] Task tasks.add[991d984a-4434-47a0-8c98-9508ca980f0b] succeeded in 2.7176807850482874s: 7
[2019-09-23 07:50:30,239: WARNING/ForkPoolWorker-9] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:30,250: WARNING/ForkPoolWorker-5] 1 tasks are still running in this worker. Ensure protection is set.
[2019-09-23 07:50:31,226: WARNING/ForkPoolWorker-3] This worker is not executing any tasks. Unsetting protection.

所以一切都很好!:D

我们通过使用一些开箱即用的 Celery 功能实现了 Celery 自动扩展(在 AWS 上(。对于您的要求,我们使用Celery的控制API(https://docs.celeryproject.org/en/latest/reference/celery.app.control.html(。关键是它的"检查"部分。Inspect 类可以采用目标参数,即要检查的 Celery 节点。我们不使用它,我们想检查集群中的所有节点,但也许您可能需要以不同的方式执行此操作。您应该熟悉此类及其.active()方法,该方法将为您提供一组工作线程或整个集群中的活动任务列表(如果未提供目标(。

最新更新