我正在尝试使用 optuna 来搜索超参数空间。
在一个特定的场景中,我在一台具有几个 GPU 的机器上训练模型。 模型和批量大小允许我每 1 个 GPU 运行 1 次训练。 所以,理想情况下,我想让optuna将所有试验传播到可用的GPU上 这样每个 GPU 上始终运行 1 个试用版。
在文档中它说,我应该在单独的终端中为每个 GPU 启动一个进程,例如:
CUDA_VISIBLE_DEVICES=0 optuna study optimize foo.py objective --study foo --storage sqlite:///example.db
我想避免这种情况,因为之后整个超参数搜索会继续多轮。我不想总是手动启动每个 GPU 的进程,检查何时完成,然后开始下一轮。
我看到study.optimize
有一个n_jobs
论点。 乍一看,这似乎是完美的。例如我可以这样做:
import optuna
def objective(trial):
# the actual model would be trained here
# the trainer here would need to know which GPU
# it should be using
best_val_loss = trainer(**trial.params)
return best_val_loss
study = optuna.create_study()
study.optimize(objective, n_trials=100, n_jobs=8)
这将启动多个线程,每个线程启动一个训练。 但是,objective
中的训练器以某种方式需要知道它应该使用哪个 GPU。 有没有诀窍可以做到这一点?
经过几次精神崩溃,我发现我可以用multiprocessing.Queue
做我想做的事。为了将其放入目标函数中,我需要将其定义为lambda函数或类(我想部分也可以(。例如
from contextlib import contextmanager
import multiprocessing
N_GPUS = 2
class GpuQueue:
def __init__(self):
self.queue = multiprocessing.Manager().Queue()
all_idxs = list(range(N_GPUS)) if N_GPUS > 0 else [None]
for idx in all_idxs:
self.queue.put(idx)
@contextmanager
def one_gpu_per_process(self):
current_idx = self.queue.get()
yield current_idx
self.queue.put(current_idx)
class Objective:
def __init__(self, gpu_queue: GpuQueue):
self.gpu_queue = gpu_queue
def __call__(self, trial: Trial):
with self.gpu_queue.one_gpu_per_process() as gpu_i:
best_val_loss = trainer(**trial.params, gpu=gpu_i)
return best_val_loss
if __name__ == '__main__':
study = optuna.create_study()
study.optimize(Objective(GpuQueue()), n_trials=100, n_jobs=8)
如果您想要一个将参数传递给多个作业使用的目标函数的记录解决方案,那么 Optuna 文档提供了两种解决方案:
- 可调用类(可以与多处理结合使用(,
- lambda 函数包装器(注意:更简单,但不适用于多处理(。
如果您准备采取一些捷径,那么您可以通过直接(通过 python 环境(将全局值(常量,例如使用的 GPU 数量(传递给__call__()
方法(而不是作为__init__()
的参数(来跳过一些样板。
可调用类解决方案经过测试,可以与两个多处理后端(loky/multiprocessing(和远程数据库后端(mariadb/postgresql(一起工作(在optuna==2.0.0
年(。
为了克服这个问题,如果引入了一个全局变量来跟踪当前正在使用的 GPU,然后可以在目标函数中读出该变量。代码如下所示。
EPOCHS = n
USED_DEVICES = []
def objective(trial):
time.sleep(random.uniform(0, 2)) #used because all n_jobs start at the same time
gpu_list = list(range(torch.cuda.device_count())
unused_gpus = [x for x in gpu_list if x not in USED_DEVICES]
idx = random.choice(unused_gpus)
USED_DEVICES.append(idx)
unused_gpus.remove(idx)
DEVICE = f"cuda:{idx}"
model = define_model(trial).to(DEVICE)
#... YOUR CODE ...
for epoch in range(EPOCHS):
# ... YOUR CODE ...
if trial.should_prune():
USED_DEVICES.remove(idx)
raise optuna.exceptions.TrialPruned()
#remove idx from list to reuse in next trial
USED_DEVICES.remove(idx)