使用PyTorchLightning在多个GPU的DDP模式下运行测试计算



我有一个模型,我试图在DDP模式下与培训师一起使用。

import pytorch_lightning as pl
import torch
import torchvision
from torchmetrics import Accuracy
class Model(pl.LightningModule):
def __init__(
self,
model_name: str,
num_classes: int,
model_hparams: Dict["str", Union[str, int]],
optimizer_name: str,
optimizer_hparams: Dict["str", Union[str, int]],
):
super().__init__()
self.save_hyperparameters()
self.model = torchvision.resnet18(num_classes=num_classes, **model_hparams)
self.loss_module = CrossEntropyLoss()
self.example_input_array = torch.zeros((1, 3, 512, 512), dtype=torch.float32)
# Trying to use in DDP mode
self.test_accuracy = Accuracy(num_classes=num_classes)
def forward(self, imgs) -> Tensor:
return self.model(imgs)
# <redacted training_*, val_*, etc. as they are not relevant>
def test_step(self, batch, batch_idx):
imgs, labels = batch
preds = self.model(imgs)
self.test_accuracy.update(preds, labels)
return labels, preds.argmax(dim=-1)
def test_epoch_end(self, outputs) -> None:
num_classes = self.hparams.num_classes
# Creates table of correct and incorrect predictions
results = torch.zeros((num_classes, num_classes))
for output in outputs:
for label, prediction in zip(*output):
results[int(label), int(prediction)] += 1
# Total accuracy. This and `compute` are identical in 1 GPU training
acc = results.diag().sum() / results.sum()
self.log("test_acc", self.test_accuracy.compute())
print(results) # This prints twice

和培训师

trainer = pl.Trainer(
gpus=torch.cuda.device_count(),
max_epochs=180,
callbacks=callbacks,
strategy="ddp",
auto_scale_batch_size="binsearch",
)

然而,我从test中获得了打印

tensor([[0., 0., 0., 0., 0., 5.],
[0., 7., 0., 0., 0., 0.],
[0., 3., 0., 0., 0., 2.],
[0., 3., 0., 0., 0., 0.],
[0., 3., 0., 0., 0., 2.],
[0., 1., 0., 0., 0., 4.]])tensor([[0., 0., 0., 0., 0., 6.],
[0., 2., 0., 0., 0., 0.],
[0., 4., 0., 0., 0., 2.],
[0., 2., 0., 0., 0., 1.],
[0., 3., 0., 0., 0., 2.],
[0., 5., 0., 0., 0., 3.]])

还有

trainer.fit(model, datamodule=datamodule)
test_results = trainer.test(model, datamodule=datamodule)
print(test_results) 
# [{'test_acc': 0.18333333730697632}]
# [{'test_acc': 0.18333333730697632}]

其中我只期望打印单个张量。我如何对所有测试预测进行计算,而不是通过GPU进行计算,并根据这些预测返回我在test_epoch_end中创建的表?我把文档解释为*_epoch_end只在单个GPU上执行,我很失落。

我认为您应该使用以下技术:

  • test_epoch_end:在ddp模式下,每个gpu在该方法中运行相同的代码。因此,每个gpu计算部分批次的度量,而不是整个批次的度量。您需要将度量和收集同步到rank==0gpu以进行计算整个数据集的评估度量。

  • torch.distributed.reduce:该方法收集并计算分布式gpu设备上的张量。(文档(

  • self.trainer.is_global_zero:对于秩==0 ,此标志将为真

  • 在测试集上手动计算度量的最佳方式是什么?你应该检查文档

使用上述技术,您可以计算整个数据集的度量,并在.test之后使用results张量。以下是片段:

import os
import torch
from torch.utils.data import DataLoader
from torchvision import models, transforms
from torchvision.datasets import CIFAR10
from pytorch_lightning import LightningModule, LightningDataModule, Trainer
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'

class CIFAR(LightningDataModule):
def __init__(self, img_size=32, batch_size=32):
super().__init__()
self.img_size = img_size if isinstance(img_size, tuple) else (img_size, img_size)
self.batch_size = batch_size
self.test_transforms = transforms.Compose([
transforms.Resize(self.img_size),
transforms.CenterCrop(self.img_size),
transforms.ToTensor(),
transforms.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
])
def prepare_data(self) -> None:
CIFAR10(root='data', train=True, download=True)
CIFAR10(root='data', train=False, download=True)

def setup(self, stage=None):
self.test_ds = CIFAR10(root='data', train=False, download=False, transform=self.test_transforms)
def test_dataloader(self):
return DataLoader(self.test_ds, num_workers=4, batch_size=self.batch_size, shuffle=False)

class BasicModule(LightningModule):
def __init__(self):
super().__init__()
self.model = models.resnet18(num_classes=10, pretrained=False)
def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self.model(x)
return y, y_hat.argmax(dim=-1)
def test_epoch_end(self, outputs):
results = torch.zeros((10, 10)).to(self.device)
for output in outputs:
for label, prediction in zip(*output):
results[int(label), int(prediction)] += 1
torch.distributed.reduce(results, 0, torch.distributed.ReduceOp.SUM)
acc = results.diag().sum() / results.sum()
if self.trainer.is_global_zero:
self.log("test_metric", acc, rank_zero_only=True)
self.trainer.results = results


if __name__ == '__main__':
data = CIFAR(batch_size=512)
model = BasicModule()
trainer = Trainer(max_epochs=2, gpus='0,1', strategy="ddp", precision=16)
test_results = trainer.test(model, data)
if trainer.is_global_zero:
print(test_results)
print(trainer.results)

也许这不是您想要的答案,但您可以在文件系统中累积所有设备的输出,并在以后执行计算。

在LightningModule:中定义test_step

def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self.model(x).data.cpu().numpy()
df = pd.DataFrame({'y_hat': y_hat})
device = y_hat.get_device()    # this line binds out file to gpu
df.to_csv(f"out{device}.csv")  # add code to do not overwrite file every time

每个gpu将写入单独的文件。

最新更新