我正在尝试使用Celery来处理输入列表。我只想处理每个输入一次。问题是我的服务器都是超级计算机集群的一部分。我可以向每个服务器发送一个命令来启动一个进程。一旦该服务器被安排为我的用户名执行工作(在将来的某个随机时间发生(,它将启动该过程(因此随时运行的服务器数量是不确定的(。我希望所有为我的用户名执行工作的服务器共享可用工作,直到完成所有必需的工作。
不过,我对如何协调这一点感到困惑。
这是我的app.py
,它概述了服务器要使用的任务:
from celery import Celery
app = Celery('tasks',
backend='redis://localhost:6379/0',
broker='redis://localhost:6379/0')
@app.task
def add(x, y):
with open('results.txt', 'a') as out:
out.write(str(x + y) + 'n')
以下是安排工作的脚本(worker.py
(:
'''Worker node; executes tasks outlined in app.py'''
from app import add
# run the add function and pass in arguments
for i in range(10000):
result = add.apply_async(args=[1,i]).get()
在我的本地机器上,如果我在终端中运行celery worker -l info -A app
,这将启动芹菜应用程序。如果我然后运行worker.py
,我看到工作正在被搅动。
如何让多个不同的主机使用未完成的任务?每个服务器都可以访问运行 Redis 的静态 IP。我是否将celery worker -l info -A app
命令提交给每个主机?如果是这样,每个主机在上线时会神奇地消耗未完成的工作吗?如果其他人能为这些高级问题提供任何帮助,我将非常感谢!
为了回答上面的问题,我创建了一个名为app.py
的文件,并将其加载到我可以 ssh 的前端节点。此文件概述了不同服务器上的各个工作人员将处理的功能:
from celery import Celery
app = Celery('tasks',
backend='redis://daccssfe.crc.nd.edu:6379/0',
broker='redis://daccssfe.crc.nd.edu:6379/0')
@app.task
def log(*args):
# have all workers write their results to a common outfile
with open('/scratch365/dduhaime/celery-test.txt', 'a') as out:
out.write('-'.join([str(i).strip() for i in args]) + 'n')
接下来,我定义了一个函数schedule_work.py
来调度要完成的工作:
'''Worker node; executes tasks outlined in app.py'''
from app import log
# run the add function and pass in arguments
for i in range(10000):
print('* processing', i)
result = log.apply_async(args=[str(i)]).get()
此文件创建 10,000 个要执行的工作单元,并将每个整数 0:10000-1 传递给工作队列。当工作人员联机时,他们将处理此队列。
为了添加工作线程,我使用大学的超级计算系统创建了 10 个工作线程,每个工作线程启动app.py
文件,这将使工作线程使用堆栈中的工作。为了使用 Sun Grid Engine 队列系统(我正在研究的超级计算机使用它作为作业提交协议(来做到这一点,我将以下内容保存在文件start_workers.sh
中:
#!/bin/bash
#$ -N celery
#$ -o logs/celery.log
#$ -t 1-10:1
#$ -pe smp 4
#$ -q long
#$ -r y
source ~/.bash_profile
source celery-env/bin/activate
# add a new worker
celery worker -l info -A app
然后我提交了这些工作(qsub start_workers.sh
(,开始了10个工人,每个工人都从要做的工作列表中抽出。最后,他们都记录了他们的主机地址和要完成的工作列表中的参数到他们都可以访问的请求的输出文件中。正如我们在结果文件中所看到的,10 个工作主机集中的不同主机消耗了不同的输入:
# /scratch365/dduhaime/celery-test.txt content
10.32.77.210-0
10.32.77.210-1
10.32.77.132-2
10.32.77.210-3
10.32.77.142-4
10.32.77.132-5
10.32.77.210-6
10.32.77.192-7
10.32.77.116-8
10.32.77.142-9
10.32.77.132-10
...