我目前训练了一个DNN,它可以对游戏所处的状态进行one-hot编码分类预测。本质上,假设有三种状态,0, 1, or 2.
现在,我通常会使用categorical_cross_entropy
作为损失函数,但我意识到并不是所有的分类对我的状态都是不相等的。例如:
- 如果模型预测它应该是状态1,如果分类错误,我的系统没有成本,因为状态1基本上什么都不做,所以奖励0x。
- 如果模型正确预测状态0或2(即预测= 2和正确= 2),那么奖励应该是3x。
- 如果模型不正确预测状态0或2(即预测= 2和正确= 0),那么奖励应该是-1x。
我知道我们可以在Keras中声明我们的自定义损失函数,但我一直坚持形成它。谁有建议如何转换伪代码?我不知道如何在矢量操作中做到这一点。
<<p>附加问题/strong>我认为我追求的是奖励功能。这和损失函数是一样的吗?谢谢!def custom_expectancy(y_expected, y_pred):
# Get 0, 1 or 2
expected_norm = tf.argmax(y_expected);
predicted_norm = tf.argmax(y_pred);
# Some pseudo code....
# Now, if predicted == 1
# loss += 0
# elif predicted == expected
# loss -= 3
# elif predicted != expected
# loss += 1
#
# return loss
来源参考:https://datascience.stackexchange.com/questions/55215/how-do-i-create-a-keras-custom-loss-function-for-a-one-hot-encoded-binary-classi
将softmax设置为one-hot的Keras自定义丢失
代码更新
import tensorflow as tf
def custom_expectancy(y_expected, y_pred):
# Get 0, 1 or 2
expected_norm = tf.argmax(y_expected);
predicted_norm = tf.argmax(y_pred);
results = tf.unstack(expected_norm)
# Some pseudo code....
# Now, if predicted == 1
# loss += 0
# elif predicted == expected
# loss += 3
# elif predicted != expected
# loss -= 1
for idx in range(0, len(expected_norm)):
predicted = predicted_norm[idx]
expected = expected_norm[idx]
if predicted == 1: # do nothing
results[idx] = 0.0
elif predicted == expected: # reward
results[idx] = 3.0
else: # wrong, so we lost
results[idx] = -1.0
return tf.stack(results)
我认为这就是我所追求的,但我还没有完全弄清楚如何构建正确的张量(应该是batch大小)来返回。
这里有一篇很好的文章解释了损失函数和成本函数的概念。多个答案说明了不同作者在机器学习领域是如何看待它们的。
对于损失函数,您可能会发现下面的实现很有用。它实现了加权交叉熵损失,在这种情况下,您按比例对每个类进行加权。这可以调整以满足上面指定的约束。
构建条件自定义损失的最佳方法是使用不涉及循环的tf.keras.backend.switch
。
在你的例子中,你应该组合两个开关条件表达式来获得想要的结果。
期望的损失函数可以这样再现:
def custom_expectancy(y_expected, y_pred):
zeros = tf.cast(tf.reduce_sum(y_pred*0, axis=-1), tf.float32) ### important to produce gradient
y_expected = tf.cast(tf.reshape(y_expected, (-1,)), tf.float32)
class_pred = tf.argmax(y_pred, axis=-1)
class_pred = tf.cast(class_pred, tf.float32)
cond1 = (class_pred != y_expected) & (class_pred != 1)
cond2 = (class_pred == y_expected) & (class_pred != 1)
res1 = tf.keras.backend.switch(cond1, zeros -1, zeros)
res2 = tf.keras.backend.switch(cond2, zeros +3, zeros)
return res1 + res2
式中cond1
为模型错误预测状态0或2时,cond2
为模型正确预测状态0或2时。标准状态为零,当cond1
和cond2
未激活时返回。
您可以注意到y_expected
可以作为整数编码状态的简单张量/数组()传递,无需对它们进行one-hot) .
损失函数是这样工作的:
true = tf.constant([[1], [2], [1], [0] ]) ## no need to one-hot
pred = tf.constant([[0,1,0],[0,0,1],[0,0,1],[0,1,0]])
custom_expectancy(true, pred)
返回:
<tf.Tensor: shape=(4,), dtype=float32, numpy=array([ 0., 3., -1., 0.], dtype=float32)>
那似乎符合我们的需要。
在模型中使用loss:
X = np.random.uniform(0,1, (1000,10))
y = np.random.randint(0,3, (1000)) ## no need to one-hot
model = Sequential([Dense(3, activation='softmax')])
model.compile(optimizer='adam', loss=custom_expectancy)
model.fit(X,y, epochs=3)
在这里运行笔记本
您可以这样做。如果您的基础真理y_true是密集的(形状为N3),您可以使用tf.reduce_all(y_true == [0.0, 0.0, 1.0], axis=-1, keepdims=True)
和tf.reduce_all(y_true == [1.0, 0.0, 0.0], axis=-1, keepdims=True)
来控制If/elif/else。您可以使用tf.gather.
def sparse_loss(y_true, y_pred):
"""Calculate loss for game. Follows keras loss signature.
Args:
y_true: Sparse tensor of shape N1, where correct prediction
is encoded as 0, 1, or 2.
y_pred: Tensor of shape N3. For each row, the three columns
represent the predicted probability of each state.
For example, [0.1, 0.4, 0.6] means, "There's a 10% chance the
right state is 0; 40% chance the right state is 1,
and 60% chance the right state is 2".
"""
# This is the unvectorized implementation on individual rows which is more
# intuitive. But TF requires vectorization.
# if y_true == 0:
# # Value matrix is shape 3. Broadcasting will occur.
# return -tf.reduce_sum(y_pred * [3.0, 0.0, -1.0])
# elif y_true == 2:
# return -tf.reduce_sum(y_pred * [-1.0, 0.0, 3.0])
# else:
# # According to the rules, this is never the correct
# # state the predict so it should never show up.
# assert False, f'Impossible state reached. y_true: {y_true}, y_pred: {y_pred}.'
# We vectorize by calculating the reward for all predictions for two cases:
# if y_true is zero or if y_true is two. To eliminate this inefficiency, we
# could us tf.gather to build an N3 shaped matrix to multiply against.
reward_for_true_zero = tf.reduce_sum(y_pred * [3.0, 0.0, -1.0], axis=-1, keepdims=True) # N1
reward_for_true_two = tf.reduce_sum(y_pred * [-1.0 ,0.0, 3.0], axis=-1, keepdims=True) # N1
reward = tf.where(y_true == 0.0, reward_for_true_zero, reward_for_true_one) # N1
return -tf.reduce_sum(reward)