具有复杂芹菜链的自动缩放芹菜工人



我正在尝试起草自动缩放如何与具有复杂的基于链的工作流程的芹菜一起工作。

电流:

下载 30 个 CSV(每 6 小时(,然后创建一个芹菜组来监视每个 CSV 被预处理并保存在/tmp/folder/上的各个线程

在所有任务成功且芹菜组返回 True 后,/tmp/folder/将被压缩并存储在 S3 上,其他系统将通过 API 调用收到通知。

面临的挑战

我们有大约 40-50 个任务的待处理任务,这使得整个过程非常缓慢。

建议的解决方案

自动缩放,即根据挂起任务的数量添加更多工作服务器。

这种方法适用于我拥有的工作流程吗?或者是否有可能有一个垂直缩放解决方案?解决这个问题的最佳方法是什么?

@app.task
def process_csv(path_of_csv):
# preprocessing the csv and storing in /tmp/folder/
return True
res = group(process_csv.s(path) for path in all_paths)()
with allow_join_result():
print(res.get())
if 'False' not in res.get():
# Time to store to S3

环境信息

aiodns==2.0.0
aiohttp==3.5.4
amqp==2.5.0
async-timeout==3.0.1
attrs==19.1.0
Babel==2.7.0
billiard==3.5.0.5
boto3==1.9.197
botocore==1.12.197
celery==4.1.1
certifi==2019.6.16
cffi==1.12.3
chardet==3.0.4
ddtrace==0.31.0
Django==2.2.3
django-enumfields==1.0.0
django-extensions==2.2.1
djangorestframework==3.10.1
docutils==0.14
flower==0.9.3
gevent==1.4.0
greenlet==0.4.15
gunicorn==19.9.0
idna==2.8
idna-ssl==1.1.0
jmespath==0.9.4
kombu==4.6.3
multidict==4.5.2
psutil==5.6.5
psycopg2-binary==2.8.3
pycares==3.0.0
pycparser==2.19
python-dateutil==2.8.0
pytz==2019.1
redis==3.3.0
requests==2.22.0
s3transfer==0.2.1
six==1.12.0
slackclient==2.0.0
sqlparse==0.3.0
tornado==5.1.1
typing==3.7.4
typing-extensions==3.7.4
urllib3==1.25.3
vine==1.3.0
websocket==0.2.1
websocket-client==0.56.0
Werkzeug==0.15.5
yarl==1.3.0

我正在使用RabbitMQ代理和Redis作为后端。

谢谢

如果我理解正确,这涉及 4 个步骤

  1. 下载视频

  2. 处理 CSV

  3. 压缩/tmp/文件夹

  4. 上传到云 (S3(

瓶颈处于步骤2。

您提到了 40-50 个待处理任务,假设您每个任务处理一个文件,这应该等于或小于 30(CSV 文件数(。 为什么有 40-50 个任务?

这我不清楚。您能解释一下为什么任务比预处理的文件数量多吗?

在探索自动缩放之前,您可以探索--concurrency,将其设置为 30 或更多并尝试查找它是否仍然是问题所在,这将有助于找到瓶颈。

https://docs.celeryproject.org/en/latest/userguide/workers.html#concurrency

顺便说一句,又看到了一个线程,芹菜 4.0 中的自动缩放似乎坏了(你的是celery==4.1.1(,不确定。检查线程: https://github.com/celery/celery/issues/4003

最新更新