如何为回归和分类制作多输出 Tensorflow 2 自定义训练循环?



我用Iris数据集做了一个最小可重现的例子。我制作了一个完整的神经网络来预测虹膜特征的最后一列。我还想输出目标(类别(。因此,网络必须最小化两个不同的损失函数(连续和分类(。在下一个示例中,所有内容都设置为连续目标。但是,如何将其变成多输出问题?

import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
tf.keras.backend.set_floatx('float64')
iris, target = load_iris(return_X_y=True)
X = iris[:, :3]
y = iris[:, 3]
z = target
ds = tf.data.Dataset.from_tensor_slices((X, y, z)).batch(8)
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.d2 = Dense(1)
def call(self, x):
x = self.d0(x)
x = self.d1(x)
x = self.d2(x)
return x
model = MyModel()
loss_object = tf.keras.losses.MeanAbsoluteError()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
loss = tf.keras.metrics.Mean(name='categorical loss')
error = tf.keras.metrics.MeanAbsoluteError()
@tf.function
def train_step(inputs, target):
with tf.GradientTape() as tape:
output = model(inputs)
run_loss = loss_object(target, output)
gradients = tape.gradient(run_loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
loss(run_loss)
error(target, output)

for epoch in range(50):
for xx, yy, zz in ds: # what to do with zz, the categorical target?
train_step(xx, yy)
template = 'Epoch {:>2}, MAE: {:>5.2f}'
print(template.format(epoch+1,
loss.result()))
loss.reset_states()
error.reset_states()

您可以将损失列表传递给tape.gradient,如下所示:

with tf.GradientTape() as tape:
pred_reg, pred_cat = model(inputs)
reg_loss = loss_obj_reg(y_reg, pred_reg)
cat_loss = loss_obj_cat(y_cat, pred_cat)
gradients = tape.gradient([reg_loss, cat_loss], model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))

完整示例:

import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
iris, target = load_iris(return_X_y=True)
X = tf.cast(iris[:, :3], tf.float32)
y = tf.cast(iris[:, 3], tf.float32)
z = target
ds = tf.data.Dataset.from_tensor_slices((X, y, z)).shuffle(150).batch(8)
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.d2 = Dense(1)
self.d3 = Dense(3, activation='softmax')
def call(self, x, training=None, **kwargs):
x = self.d0(x)
x = self.d1(x)
a = self.d2(x)
b = self.d3(x)
return a, b
model = MyModel()
loss_obj_reg = tf.keras.losses.MeanAbsoluteError()
loss_obj_cat = tf.keras.losses.SparseCategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
loss_reg = tf.keras.metrics.Mean(name='regression loss')
loss_cat = tf.keras.metrics.Mean(name='categorical loss')
error_reg = tf.keras.metrics.MeanAbsoluteError()
error_cat = tf.keras.metrics.SparseCategoricalAccuracy()
@tf.function
def train_step(inputs, y_reg, y_cat):
with tf.GradientTape() as tape:
pred_reg, pred_cat = model(inputs)
reg_loss = loss_obj_reg(y_reg, pred_reg)
cat_loss = loss_obj_cat(y_cat, pred_cat)
gradients = tape.gradient([reg_loss, cat_loss], model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
loss_reg(reg_loss)
loss_cat(cat_loss)
error_reg(y_reg, pred_reg)
error_cat(y_cat, pred_cat)
template = 'Epoch {:>3}, SCCE: {:>5.2f},' 
' MAE: {:>4.2f}, SAcc: {:>5.1%}'
for epoch in range(150):
for xx, yy, zz in ds:
train_step(xx, yy, zz)
if (epoch + 1) % 10 == 0:
print(template.format(epoch+1,
loss_cat.result(),
error_reg.result(),
error_cat.result()))
loss_reg.reset_states()
loss_cat.reset_states()
error_reg.reset_states()
error_cat.reset_states()
Epoch  10, SCCE:  1.41, MAE: 0.36, SAcc: 33.3%
Epoch  20, SCCE:  1.14, MAE: 0.31, SAcc: 44.0%
Epoch  30, SCCE:  1.05, MAE: 0.26, SAcc: 41.3%
Epoch  40, SCCE:  0.99, MAE: 0.21, SAcc: 40.0%
Epoch  50, SCCE:  0.94, MAE: 0.19, SAcc: 40.0%
Epoch  60, SCCE:  0.88, MAE: 0.18, SAcc: 40.0%
Epoch  70, SCCE:  0.83, MAE: 0.17, SAcc: 44.7%
Epoch  80, SCCE:  0.77, MAE: 0.17, SAcc: 75.3%
Epoch  90, SCCE:  0.70, MAE: 0.17, SAcc: 76.7%
Epoch 100, SCCE:  0.64, MAE: 0.17, SAcc: 82.7%
Epoch 110, SCCE:  0.58, MAE: 0.16, SAcc: 82.7%
Epoch 120, SCCE:  0.54, MAE: 0.16, SAcc: 88.0%
Epoch 130, SCCE:  0.50, MAE: 0.16, SAcc: 88.7%
Epoch 140, SCCE:  0.47, MAE: 0.16, SAcc: 90.7%
Epoch 150, SCCE:  0.45, MAE: 0.16, SAcc: 90.0%

通过此输出,您可以看到两种损失都最小化了。

您可以执行以下操作。我希望你只需要一个多输出网络。在这里,我将创建一个如下所示的模型。但是,即使您需要两个单独的模型,也应该能够轻松移植它。

x
| Dense(16)
x
| Dense(32)
x
Dense(1)   /  Dense(4, softmax)
/   
(cont)  y_1   y_2  (categorical)
import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K
tf.keras.backend.set_floatx('float64')
import numpy as np
iris, target = load_iris(return_X_y=True)
K.clear_session()
X = iris[:, :3]
y = iris[:, 3]
z = target
ds = tf.data.Dataset.from_tensor_slices((X, y, z)).shuffle(buffer_size=150).batch(32)
class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.d2_1 = Dense(1)
self.d2_2 = Dense(4, activation='softmax')
def call(self, x):
x = self.d0(x)
x = self.d1(x)
y_1 = self.d2_1(x)
y_2 = self.d2_2(x)
return y_1, y_2
model = MyModel()
loss_objects = [tf.keras.losses.MeanAbsoluteError(), tf.keras.losses.SparseCategoricalCrossentropy()]
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
acc = tf.keras.metrics.Accuracy(name='categorical loss')
loss = tf.keras.metrics.MeanAbsoluteError()
#error = tf.keras.metrics.MeanAbsoluteError()
@tf.function
def train_step(inputs, targets):
with tf.GradientTape() as tape:
outputs = model(inputs)
losses = [l(t, o) for l,o,t in zip(loss_objects, outputs, targets)]
gradients = tape.gradient(losses, model.trainable_variables)
#print(gradients)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
#optimizer.apply_gradients(zip(gradients[1], model.trainable_variables))
return outputs

for epoch in range(50):
for xx, yy, zz in ds: # what to do with zz, the categorical target?
outs = train_step(xx, [yy,zz])
res1 = acc.update_state(zz, np.argmax(outs[1], axis=1))
res2 = loss.update_state(yy, outs[0])
template = 'Epoch {:>2}, Accuracy: {:>5.2f}, MAE: {:>5.2f}'
print(template.format(epoch+1, acc.result(), loss.result()))
acc.reset_states()
loss.reset_states()

为了解决多任务学习问题,导入了以下模块。

import tensorflow as tf
from tensorflow.keras.layers import Dense
from tensorflow.keras import Model
from sklearn.datasets import load_iris
from tensorflow.keras.utils import to_categorical
import tensorflow.keras.backend as K
tf.keras.backend.set_floatx('float64')
import numpy as np

然后,我们定义一个多输出网络,如下所示:

x
| Dense(16)
x
| Dense(32)
x
Dense(1)   /  Dense(4, softmax)
/   
(continuous)  y_cont   y_cat  (categorical)

代码如下所示:

class MyModel(Model):
def __init__(self):
super(MyModel, self).__init__()
self.d0 = Dense(16, activation='relu')
self.d1 = Dense(32, activation='relu')
self.cont = Dense(1) # Continuous output
self.cat = Dense(4, activation='softmax') # Categorical output
def call(self, x):
x = self.d0(x)
x = self.d1(x)
print(x.shape)
y_cont = self.cont(x)
y_cat = self.cat(x)
return y_cont, y_cat
model = MyModel()

接下来,我们定义损失函数和优化器。我们使用联合训练。损失函数是连续变量的平均绝对误差和类别变量的交叉熵之和。

cont_loss_func = tf.keras.losses.MeanAbsoluteError()
cat_loss_func = tf.keras.losses.SparseCategoricalCrossentropy()
def cont_cat_loss_func(real_cont, pred_cont, real_cat, pred_cat):
return cat_loss_func(real_cat, pred_cat) + cont_loss_func(real_cont, pred_cont)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

训练步骤定义如下:

@tf.function
def train_step(inputs, target_cont, target_cat):
with tf.GradientTape() as tape:
#Forward pass
output_cont, output_cat = model(inputs)
#Compute the losses
total_loss = cont_cat_loss_func(target_cont, output_cont, target_cat, output_cat)
#Backpropagation
gradients = tape.gradient(total_loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return output_cont, output_cat

我们对网络进行了 50 个 epoch 的训练,每个 epoch 的模型性能将在训练期间显示出来。

#Model performance
acc_res = tf.keras.metrics.Accuracy()
mae_res = tf.keras.metrics.MeanAbsoluteError()
for epoch in range(50):
for xx, yy, zz in ds:
out_cont, out_cat = train_step(xx, yy, zz)
res1 = acc_res.update_state(zz, np.argmax(out_cat, axis=1))
res2 = mae_res.update_state(yy, out_cont)
template = 'Epoch {:>2}, Accuracy: {:>5.2f}, MAE: {:>5.2f}'
print(template.format(epoch+1, acc_res.result(), mae_res.result()))
acc_res.reset_states()
mae_res.reset_states()

@thushv89没有使用联合训练(即对连续变量和分类变量的损失求和(,而是使用不同的方法来计算网络的损失。但我不太明白它是如何工作的。

loss_objects = [tf.keras.losses.MeanAbsoluteError(), tf.keras.losses.SparseCategoricalCrossentropy()]
losses = [l(t, o) for l,o,t in zip(loss_objects, outputs, targets)]

最新更新