单热编码的Keras自定义损耗



我目前训练了一个DNN,它可以对游戏所处的状态进行one-hot编码分类预测。本质上,假设有三种状态,0, 1, or 2.

现在,我通常会使用categorical_cross_entropy作为损失函数,但我意识到并不是所有的分类对我的状态都是不相等的。例如:

  • 如果模型预测它应该是状态1,如果分类错误,我的系统没有成本,因为状态1基本上什么都不做,所以奖励0x。
  • 如果模型正确预测状态0或2(即预测= 2和正确= 2),那么奖励应该是3x。
  • 如果模型不正确预测状态0或2(即预测= 2和正确= 0),那么奖励应该是-1x。

我知道我们可以在Keras中声明我们的自定义损失函数,但我一直坚持形成它。谁有建议如何转换伪代码?我不知道如何在矢量操作中做到这一点。

<<p>附加问题/strong>我认为我追求的是奖励功能。这和损失函数是一样的吗?谢谢!
def custom_expectancy(y_expected, y_pred):

# Get 0, 1 or 2
expected_norm = tf.argmax(y_expected);
predicted_norm = tf.argmax(y_pred);

# Some pseudo code....
# Now, if predicted == 1
#     loss += 0
# elif predicted == expected
#     loss -= 3
# elif predicted != expected
#     loss += 1
#
# return loss

来源参考:https://datascience.stackexchange.com/questions/55215/how-do-i-create-a-keras-custom-loss-function-for-a-one-hot-encoded-binary-classi

将softmax设置为one-hot的Keras自定义丢失

代码更新

import tensorflow as tf
def custom_expectancy(y_expected, y_pred):

# Get 0, 1 or 2
expected_norm = tf.argmax(y_expected);
predicted_norm = tf.argmax(y_pred);

results = tf.unstack(expected_norm)

# Some pseudo code....
# Now, if predicted == 1
#     loss += 0
# elif predicted == expected
#     loss += 3
# elif predicted != expected
#     loss -= 1

for idx in range(0, len(expected_norm)):
predicted = predicted_norm[idx]
expected = expected_norm[idx]

if predicted == 1: # do nothing
results[idx] = 0.0
elif predicted == expected: # reward
results[idx] = 3.0
else: # wrong, so we lost
results[idx] = -1.0


return tf.stack(results)

认为这就是我所追求的,但我还没有完全弄清楚如何构建正确的张量(应该是batch大小)来返回。

这里有一篇很好的文章解释了损失函数和成本函数的概念。多个答案说明了不同作者在机器学习领域是如何看待它们的。

对于损失函数,您可能会发现下面的实现很有用。它实现了加权交叉熵损失,在这种情况下,您按比例对每个类进行加权。这可以调整以满足上面指定的约束。

构建条件自定义损失的最佳方法是使用不涉及循环的tf.keras.backend.switch

在你的例子中,你应该组合两个开关条件表达式来获得想要的结果。

期望的损失函数可以这样再现:

def custom_expectancy(y_expected, y_pred):

zeros = tf.cast(tf.reduce_sum(y_pred*0, axis=-1), tf.float32) ### important to produce gradient
y_expected = tf.cast(tf.reshape(y_expected, (-1,)), tf.float32)
class_pred = tf.argmax(y_pred, axis=-1)
class_pred = tf.cast(class_pred, tf.float32)

cond1 = (class_pred != y_expected) & (class_pred != 1)
cond2 = (class_pred == y_expected) & (class_pred != 1)

res1 = tf.keras.backend.switch(cond1, zeros -1, zeros)
res2 = tf.keras.backend.switch(cond2, zeros +3, zeros)

return res1 + res2

式中cond1为模型错误预测状态0或2时,cond2为模型正确预测状态0或2时。标准状态为零,当cond1cond2未激活时返回。

您可以注意到y_expected可以作为整数编码状态的简单张量/数组()传递,无需对它们进行one-hot) .

损失函数是这样工作的:

true = tf.constant([[1],    [2],    [1],    [0]    ])  ## no need to one-hot
pred = tf.constant([[0,1,0],[0,0,1],[0,0,1],[0,1,0]])
custom_expectancy(true, pred)

返回:

<tf.Tensor: shape=(4,), dtype=float32, numpy=array([ 0.,  3., -1.,  0.], dtype=float32)>

那似乎符合我们的需要。

在模型中使用loss:

X = np.random.uniform(0,1, (1000,10))
y = np.random.randint(0,3, (1000)) ## no need to one-hot
model = Sequential([Dense(3, activation='softmax')])
model.compile(optimizer='adam', loss=custom_expectancy)
model.fit(X,y, epochs=3)

在这里运行笔记本

您可以这样做。如果您的基础真理y_true是密集的(形状为N3),您可以使用tf.reduce_all(y_true == [0.0, 0.0, 1.0], axis=-1, keepdims=True)tf.reduce_all(y_true == [1.0, 0.0, 0.0], axis=-1, keepdims=True)来控制If/elif/else。您可以使用tf.gather.

进一步优化它。
def sparse_loss(y_true, y_pred):
"""Calculate loss for game. Follows keras loss signature.

Args:
y_true: Sparse tensor of shape N1, where correct prediction
is encoded as 0, 1, or 2. 
y_pred: Tensor of shape N3. For each row, the three columns
represent the predicted probability of each state. 
For example, [0.1, 0.4, 0.6] means, "There's a 10% chance the 
right state is 0; 40% chance the right state is 1, 
and 60% chance the right state is 2". 
"""
# This is the unvectorized implementation on individual rows which is more
# intuitive. But TF requires vectorization. 
# if y_true == 0:
#   # Value matrix is shape 3. Broadcasting will occur. 
#   return -tf.reduce_sum(y_pred * [3.0, 0.0, -1.0])
# elif y_true == 2:
#   return -tf.reduce_sum(y_pred * [-1.0, 0.0, 3.0])
# else:
#   # According to the rules, this is never the correct
#   # state the predict so it should never show up.
#   assert False, f'Impossible state reached. y_true: {y_true}, y_pred: {y_pred}.'

# We vectorize by calculating the reward for all predictions for two cases:
# if y_true is zero or if y_true is two. To eliminate this inefficiency, we 
# could us tf.gather to build an N3 shaped matrix to multiply against. 
reward_for_true_zero = tf.reduce_sum(y_pred * [3.0, 0.0, -1.0], axis=-1, keepdims=True) # N1
reward_for_true_two = tf.reduce_sum(y_pred * [-1.0 ,0.0, 3.0], axis=-1, keepdims=True) # N1
reward = tf.where(y_true == 0.0, reward_for_true_zero, reward_for_true_one) # N1
return -tf.reduce_sum(reward)

相关内容

  • 没有找到相关文章

最新更新