没有定义用于运行"Trainer.foredict"的"predict_datalog()



我正试图基于测试集数据加载器从我的模型中获得预测(我想保存x和y^hat,稍后需要进行测试(。

我试过了:

my_results = trainer.predict(model = model, datamodule=dm)

在我的代码中存在以下项目:

class TimeseriesDataset(Dataset):   
'''
Custom Dataset subclass. 
Serves as input to DataLoader to transform X 
into sequence data using rolling window. 
DataLoader using this dataset will output batches 
of `(batch_size, seq_len, n_features)` shape.
Suitable as an input to RNNs. 
'''
def __init__(self, X: np.ndarray, y: np.ndarray, seq_len: int = 1):
self.X = torch.tensor(X).float()
self.y = torch.tensor(y).float()
self.seq_len = seq_len
def __len__(self):
return self.X.__len__() - (self.seq_len-1)
def __getitem__(self, index):
return (self.X[index:index+self.seq_len], self.y[index+self.seq_len-1])

class LSTMRegressor(pl.LightningModule):
'''
Standard PyTorch Lightning module:
https://pytorch-lightning.readthedocs.io/en/latest/lightning_module.html
'''
def __init__(self, 
n_features, 
hidden_size, 
seq_len, 
batch_size,
num_layers, 
dropout, 
learning_rate,
criterion):
super(LSTMRegressor, self).__init__()
self.n_features = n_features
self.hidden_size = hidden_size
self.seq_len = seq_len
self.batch_size = batch_size
self.num_layers = num_layers
self.dropout = dropout
self.criterion = criterion
self.learning_rate = learning_rate
self.lstm = nn.LSTM(input_size=n_features, 
hidden_size=hidden_size,
num_layers=num_layers, 
dropout=dropout, 
batch_first=True)
self.linear = nn.Linear(hidden_size, 2)

def forward(self, x):
# lstm_out = (batch_size, seq_len, hidden_size)
lstm_out, _ = self.lstm(x)
y_pred = self.linear(lstm_out[:,-1])
return y_pred

def configure_optimizers(self):
return torch.optim.Adam(self.parameters(), lr=self.learning_rate)
def training_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = self.criterion(y_hat, y)
# result = pl.TrainResult(loss)
self.log('train_loss', loss)
return loss

def predict_step(self, batch, batch_idx): 
with torch.no_grad():
x, y = batch
y_hat = self(x)
return x, y_hat
def validation_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = self.criterion(y_hat, y)
# result = pl.EvalResult(checkpoint_on=loss)
self.log('val_loss', loss)
enable_checkpointing = True, #ModelCheckpoint(monitor='val_loss')
# checkpoint_callback = ModelCheckpoint(
# monitor='val_loss',
# dirpath='./lstm',
# filename='lstm{epoch:02d}-val_loss{val/loss:.2f}',
# auto_insert_metric_name=False
# )
return loss

def test_step(self, batch, batch_idx):
x, y = batch
y_hat = self(x)
loss = self.criterion(y_hat, y)
# result = pl.EvalResult()
self.log('test_loss', loss)
enable_checkpointing = True, #ModelCheckpoint(monitor='test_loss')   #TODO check if loss is the thing to return in this function
return loss

和:

class CryptoDataModule(pl.LightningDataModule):
'''
PyTorch Lighting DataModule subclass:
https://pytorch-lightning.readthedocs.io/en/latest/datamodules.html
Serves the purpose of aggregating all data loading 
and processing work in one place.
'''

def __init__(self, seq_len = 1, batch_size = 128, num_workers=0):
super().__init__()
self.seq_len = seq_len
self.batch_size = batch_size
self.num_workers = num_workers
self.X_train = None
self.y_train = None
self.X_val = None
self.y_val = None
self.X_test = None
self.X_test = None
self.columns = None
self.preprocessing = None

def setup(self, stage=None):
'''
Data is resampled to hourly intervals.
Both 'np.nan' and '?' are converted to 'np.nan'
'Date' and 'Time' columns are merged into 'dt' index
'''
if stage == 'fit' and self.X_train is not None:
return 
if stage == 'test' or stage == 'predict' and self.X_test is not None:
return
if stage is None and self.X_train is not None and self.X_test is not None:  
return

path = './eth_data_1d.csv'

df = pd.read_csv(
path, 
sep=',', 
infer_datetime_format=True, 
low_memory=False, 
na_values=['nan','?'], 
index_col='Time'
)

y = pd.concat([df['Top'], df['Btm']], axis=1, keys=['Top', 'Btm'])
X = df.dropna().copy()        
self.columns = X.columns
X_cv, X_test, y_cv, y_test = train_test_split(
X, y, test_size=0.2, shuffle=False
)
X_train, X_val, y_train, y_val = train_test_split(
X_cv, y_cv, test_size=0.25, shuffle=False
)
preprocessing = StandardScaler()
preprocessing.fit(X_train)
self.X_train = preprocessing.transform(X_train)
self.y_train = y_train.values.reshape((-1, 2))
self.X_val = preprocessing.transform(X_val)
self.y_val = y_val.values.reshape((-1, 2))
self.X_test = preprocessing.transform(X_test)
self.y_test = y_test.values.reshape((-1, 2))

def train_dataloader(self):
train_dataset = TimeseriesDataset(self.X_train, 
self.y_train, 
seq_len=self.seq_len)
train_loader = DataLoader(train_dataset, 
batch_size = self.batch_size, 
shuffle = False, 
num_workers = self.num_workers)

return train_loader
def val_dataloader(self):
val_dataset = TimeseriesDataset(self.X_val, 
self.y_val, 
seq_len=self.seq_len)
val_loader = DataLoader(val_dataset, 
batch_size = self.batch_size, 
shuffle = False, 
num_workers = self.num_workers)
return val_loader
def test_dataloader(self):
test_dataset = TimeseriesDataset(self.X_test, 
self.y_test, 
seq_len=self.seq_len)
test_loader = DataLoader(test_dataset, 
batch_size = self.batch_size, 
shuffle = False, 
num_workers = self.num_workers)
return test_loader

给我以下错误:

MisconfigurationException                 Traceback (most recent call last)
/Users/xxx/ai_bt/model.ipynb Cell 22 in <cell line: 34>()
1 # train on test set too! : see below
2 # trainer.test(dataloaders=test_dataloaders)
3 
(...)
30 # with torch.no_grad():
31     # predictions = trainer.predict(model, dm)
---> 34 my_results = trainer.predict(model = model, datamodule=dm)
File /opt/homebrew/lib/python3.9/site-packages/pytorch_lightning/trainer/trainer.py:1025, in Trainer.predict(self, model, dataloaders, datamodule, return_predictions, ckpt_path)
1000 r"""
1001 Run inference on your data.
1002 This will call the model forward function to compute predictions. Useful to perform distributed
(...)
1022     Returns a list of dictionaries, one for each provided dataloader containing their respective predictions.
1023 """
1024 self.strategy.model = model or self.lightning_module
-> 1025 return self._call_and_handle_interrupt(
1026     self._predict_impl, model, dataloaders, datamodule, return_predictions, ckpt_path
1027 )
File /opt/homebrew/lib/python3.9/site-packages/pytorch_lightning/trainer/trainer.py:723, in Trainer._call_and_handle_interrupt(self, trainer_fn, *args, **kwargs)
721         return self.strategy.launcher.launch(trainer_fn, *args, trainer=self, **kwargs)
...
--> 197     raise MisconfigurationException(f"No `{loader_name}()` method defined to run `Trainer.{trainer_method}`.")
199 # predict_step is not required to be overridden
200 if stage == "predict":
MisconfigurationException: No `predict_dataloader()` method defined to run `Trainer.predict`.

这一定很傻,因为我搞不清数据加载器指的是什么。dm参数在训练中很有效。。。

更新:基于@Mikel B的回答,我添加了:

def predict_dataloader(self):
predict_dataset = TimeseriesDataset(self.X_test, 
self.y_test, 
seq_len=self.seq_len)
predict_loader = DataLoader(predict_dataset, 
batch_size = self.batch_size, 
shuffle = False, 
num_workers = self.num_workers)
return predict_loader

结果是:

---> 44     output = model(batch)
45     output = model.proba(output) # if not part of forward already
46     prediction_list.append(output)
File /opt/homebrew/lib/python3.9/site-packages/torch/nn/modules/module.py:1130, in Module._call_impl(self, *input, **kwargs)
1126 # If we don't have any hooks, we want to skip the rest of the logic in
1127 # this function, and just call forward.
1128 if not (self._backward_hooks or self._forward_hooks or self._forward_pre_hooks or _global_backward_hooks
1129         or _global_forward_hooks or _global_forward_pre_hooks):
-> 1130     return forward_call(*input, **kwargs)
1131 # Do not call functions when jit is used
1132 full_backward_hooks, non_full_backward_hooks = [], []
/Users/user/ai_bt/model.ipynb Cell 22 in LSTMRegressor.forward(self, x)
32 def forward(self, x):
33     # lstm_out = (batch_size, seq_len, hidden_size)
---> 34     lstm_out, _ = self.lstm(x)
35     y_pred = self.linear(lstm_out[:,-1])
36     return y_pred
...
--> 731     is_batched = input.dim() == 3
732     batch_dim = 0 if self.batch_first else 1
733     if not is_batched:
AttributeError: 'list' object has no attribute 'dim'

您尚未在LightningDataModule:中定义predict_dataloader()

class MNISTDataModule(pl.LightningDataModule):
def __init__(self, data_dir: str = "path/to/dir", batch_size: int = 32):
super().__init__()
self.data_dir = data_dir
self.batch_size = batch_size
def setup(self, stage: Optional[str] = None):
self.mnist_test = MNIST(self.data_dir, train=False)
self.mnist_predict = MNIST(self.data_dir, train=False)
mnist_full = MNIST(self.data_dir, train=True)
self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000])
def train_dataloader(self):
return DataLoader(self.mnist_train, batch_size=self.batch_size)
def val_dataloader(self):
return DataLoader(self.mnist_val, batch_size=self.batch_size)
def test_dataloader(self):
return DataLoader(self.mnist_test, batch_size=self.batch_size)
# THIS IS WHAT YOU ARE MISSING
def predict_dataloader(self):
return DataLoader(self.mnist_predict, batch_size=self.batch_size)
def teardown(self, stage: Optional[str] = None):
# Used to clean-up when the run is finished
...

如果没有这种方法,trainer就不知道要为predict_step加载哪些数据

最新更新