如何转换使用ssh到pbsdsh的脚本,而使用Ray?



我坚持将使用ssh激活节点的脚本转换为pbsdsh。我使用Ray进行节点通信。我的ssh脚本是:

#!/bin/bash
#PBS -N Experiment_1
#PBS -l select=2:ncpus=24:mpiprocs=24
#PBS -P CSCIxxxx
#PBS -q normal
#PBS -l walltime=01:30:00
#PBS -m abe
#PBS -M xxxxx@gmail.com
ln -s $PWD $PBS_O_WORKDIR/$PBS_JOBID
cd $PBS_O_WORKDIR
jobnodes=`uniq -c ${PBS_NODEFILE} | awk -F. '{print $1 }' | awk '{print $2}' | paste -s -d " "`
thishost=`uname -n | awk -F. '{print $1.}'`
thishostip=`hostname -i`
rayport=6379

thishostNport="${thishostip}:${rayport}"
echo "Allocate Nodes = <$jobnodes>"
export thishostNport

echo "set up ray cluster..." 
for n in `echo ${jobnodes}`
do
if [[ ${n} == "${thishost}" ]]
then
echo "first allocate node - use as headnode ..."
module load chpc/python/anaconda/3-2019.10
source /apps/chpc/chem/anaconda3-2019.10/etc/profile.d/conda.sh
conda activate /home/mnasir/env1
ray start --head
sleep 5
else
ssh ${n}  $PBS_O_WORKDIR/startWorkerNode.pbs ${thishostNport}
sleep 10
fi
done 

python -u example_trainer.py 
rm $PBS_O_WORKDIR/$PBS_JOBID
#

startWorkerNode的地方。pbs是:

#!/bin/bash -l
source $HOME/.bashrc
cd $PBS_O_WORKDIR
param1=$1
destnode=`uname -n`
echo "destnode is = [$destnode]"
module load chpc/python/anaconda/3-2019.10
source /apps/chpc/chem/anaconda3-2019.10/etc/profile.d/conda.sh
conda activate /home/mnasir/poet
ray start --address="${param1}" --redis-password='5241590000000000'

和example_trainer.py是:

from collections import Counter
import os
import socket
import sys
import time
import ray
num_cpus = int(sys.argv[1])
ray.init(address=os.environ["thishostNport"])
print("Nodes in the Ray cluster:")
print(ray.nodes()) # This should print all N nodes we are trying to access

@ray.remote
def f():
time.sleep(1)
return socket.gethostbyname(socket.gethostname()) + "--" + str(socket.gethostname())

# The following takes one second (assuming that
# ray was able to access all of the allocated nodes).
for i in range(60):
start = time.time()
ip_addresses = ray.get([f.remote() for _ in range(num_cpus)])
print("GOT IPs", ip_addresses)
print(Counter(ip_addresses))
end = time.time()
print(end - start)

这工作完美,并在所有节点之间通信,但当我试图将命令更改为pbsds时,它返回:

pbsdsh: task 0x00000000 exit status 254
pbsdsh: task 0x00000001 exit status 254

mpiprocs=1设置为24时,重复48次。

据我所知,ray需要一个主机节点,然后工作节点连接到它,因此for循环和if语句在其中。

我已经尝试直接替换脚本中的pbsdsh与/没有识别节点。我已经将pbsdsh添加到循环之外,并尝试了大量可能的组合。

我遵循了这些问题,但无法让我的代码在整个节点上通信:

  • PBS/TORQUE:我如何在多个节点上提交并行作业?
  • 如何使用PBS在每个分配的节点上执行脚本
  • 在一个pbs作业中处理多个节点

我相信可能有一些不是太大的东西我无法实现。非常感谢您的帮助和指导!

要解决这个问题,需要更改以下几个主要内容:

  1. #PBS -l select=2:ncpus=24:mpiprocs=1应该用作选择行,具体地说,将mpiprocs24更改为1,这样pbsdsh每个节点只启动一个进程而不是24个。

  2. jobscript.sh中,在else中,您可以使用pbsdsh -n $J -- $PBS_O_WORKDIR/startWorkerNode.pbs ${thishostNport} &仅在一个节点上运行pbsdsh,并且是在后台运行。J作为节点索引保存,并在for循环的每次迭代中递增。这导致ray start在每个节点上运行一次。

  3. startWorkerNode.pbs中,在末尾添加以下代码

# Here, sleep for the duration of the job, so ray does not stop
WALLTIME=$(qstat -f $PBS_JOBID | sed -rn 's/.*Resource_List.walltime = (.*)/1/p')
SECONDS=`echo $WALLTIME | awk -F: '{ print ($1 * 3600) + ($2 * 60) + $3 }'`
echo "SLEEPING FOR $SECONDS s"
sleep $SECONDS

这确保ray start不会在pbsdsh命令返回时立即退出,并在作业期间保持活动状态。前一点中的&在这里也是必要的,因为没有它pbsdsh将永远不会返回。

以下是可供参考的文件:

startWorkerNode.pbs

#!/bin/bash -l
source $HOME/.bashrc
cd $PBS_O_WORKDIR
param1=$1
destnode=`uname -n`
echo "destnode is = [$destnode]"
module load chpc/python/anaconda/3-2019.10
source /apps/chpc/chem/anaconda3-2019.10/etc/profile.d/conda.sh
conda activate /home/mnasir/poet
ray start --address="${param1}" --redis-password='5241590000000000'
# Here, sleep for the duration of the job, so ray does not stop
WALLTIME=$(qstat -f $PBS_JOBID | sed -rn 's/.*Resource_List.walltime = (.*)/1/p')
SECONDS=`echo $WALLTIME | awk -F: '{ print ($1 * 3600) + ($2 * 60) + $3 }'`
echo "SLEEPING FOR $SECONDS s"
sleep $SECONDS

jobscript.sh

#!/bin/bash
#PBS -N Experiment_1
#PBS -l select=2:ncpus=24:mpiprocs=1
#PBS -P CSCIxxxx
#PBS -q normal
#PBS -l walltime=01:30:00
#PBS -m abe
#PBS -M xxxxx@gmail.com
ln -s $PWD $PBS_O_WORKDIR/$PBS_JOBID
cd $PBS_O_WORKDIR
jobnodes=`uniq -c ${PBS_NODEFILE} | awk -F. '{print $1 }' | awk '{print $2}' | paste -s -d " "`
thishost=`uname -n | awk -F. '{print $1.}'`
thishostip=`hostname -i`
rayport=6379

thishostNport="${thishostip}:${rayport}"
echo "Allocate Nodes = <$jobnodes>"
export thishostNport

echo "set up ray cluster..." 
J=0
for n in `echo ${jobnodes}`
do
if [[ ${n} == "${thishost}" ]]
then
echo "first allocate node - use as headnode ..."
module load chpc/python/anaconda/3-2019.10
source /apps/chpc/chem/anaconda3-2019.10/etc/profile.d/conda.sh
conda activate /home/mnasir/env1
ray start --head
sleep 5
else
# Run pbsdsh on the J'th node, and do it in the background.
pbsdsh -n $J -- $PBS_O_WORKDIR/startWorkerNode.pbs ${thishostNport} &
sleep 10
fi
J=$((J+1))
done 

python -u example_trainer.py 48
rm $PBS_O_WORKDIR/$PBS_JOBID

最新更新