我目前正在研究强化学习中的动态规划,其中我遇到了两个概念:价值迭代和策略迭代。 为了理解这一点,我正在实现萨顿的网格世界示例,其中说:
非终端状态为 S = {1, 2, . . . , 14}。有四个 每个状态下可能的操作,A = {上,下,右,左},其中 确定性地导致相应的状态转换,除了 将代理从电网中移除的操作实际上会离开 状态不变。因此,例如,p(6, 1|5, right( = 1, p(7, 1|7, right( = 1,p(10, r |5, right( = 0 表示属于 R 的所有 r。这是一个 未打折的情节任务。所有转换的奖励为 -1,直到 达到终端状态。终端状态在 图(虽然在两处显示,但形式上是一个状态(。 因此,对于所有状态,期望奖励函数为 r(s, a, s'( = 1 S、S' 和操作 A。假设代理遵循等概率 随机策略(所有操作的可能性相同(。
下面是两种方法的实现
class GridWorld:
def __init__(self,grid_size = 5, gamma = 0.9, penalty = -1.0, theta = 1e-2):
self.grid_size = grid_size
self.discount = gamma
self.actions = [np.array([0, -1]),
np.array([-1, 0]),
np.array([0, 1]),
np.array([1, 0])]
self.action_prob = 1/len(self.actions)
self.theta = theta
print('action prob : ',self.action_prob)
self.penalty_reward = penalty
self.re_init()
def re_init(self):
self.values = np.zeros((self.grid_size, self.grid_size))
self.policy = np.zeros(self.values.shape, dtype=np.int)
def checkTerminal(self,state):
x, y = state
if x == 0 and y == 0:
return 1
elif (x == self.grid_size - 1 and y == self.grid_size - 1):
return 1
else :
return 0
def step(self, state, action):
#print(state)
if self.checkTerminal(state):
next_state = state
reward = 0
else:
next_state = (np.array(state) + action).tolist()
x, y = next_state
if x < 0 or x >= self.grid_size or y < 0 or y >= self.grid_size:
next_state = state
reward = self.penalty_reward
return next_state, reward
def compValueIteration(self):
new_state_values = np.zeros((self.grid_size, self.grid_size))
policy = np.zeros((self.grid_size, self.grid_size))
iter_cnt = 0
while True:
#delta = 0
state_values = new_state_values.copy()
old_state_values = state_values.copy()
for i in range(self.grid_size):
for j in range(self.grid_size):
values = []
for action in self.actions:
(next_i, next_j), reward = self.step([i, j], action)
values.append(reward + self.discount*state_values[next_i, next_j])
new_state_values[i, j] = np.max(values)
policy[i, j] = np.argmax(values)
#delta = max(delta, np.abs(old_state_values[i, j] - new_state_values[i, j]))
delta = np.abs(old_state_values - new_state_values).max()
print(f'Difference: {delta}')
if delta < self.theta:
break
iter_cnt += 1
return new_state_values, policy, iter_cnt
def policyEvaluation(self,policy,new_state_values):
#new_state_values = np.zeros((self.grid_size, self.grid_size))
iter_cnt = 0
while True:
delta = 0
state_values = new_state_values.copy()
old_state_values = state_values.copy()
for i in range(self.grid_size):
for j in range(self.grid_size):
action = policy[i, j]
(next_i, next_j), reward = self.step([i, j], action)
value = self.action_prob * (reward + self.discount * state_values[next_i, next_j])
new_state_values[i, j] = value
delta = max(delta, np.abs(old_state_values[i, j] - new_state_values[i, j]))
print(f'Difference: {delta}')
if delta < self.theta:
break
iter_cnt += 1
return new_state_values
def policyImprovement(self, policy, values, actions):
#expected_action_returns = np.zeros((self.grid_size, self.grid_size, np.size(actions)))
policy_stable = True
for i in range(self.grid_size):
for j in range(self.grid_size):
old_action = policy[i, j]
act_cnt = 0
expected_rewards = []
for action in self.actions:
(next_i, next_j), reward = self.step([i, j], action)
expected_rewards.append(self.action_prob * (reward + self.discount * values[next_i, next_j]))
#max_reward = np.max(expected_rewards)
#new_action = np.random.choice(np.where(expected_rewards == max_reward)[0])
new_action = np.argmax(expected_rewards)
#print('new_action : ',new_action)
#print('old_action : ',old_action)
if old_action != new_action:
policy_stable = False
policy[i, j] = new_action
return policy, policy_stable
def compPolicyIteration(self):
iterations = 0
total_start_time = time.time()
while True:
start_time = time.time()
self.values = self.policyEvaluation(self.policy, self.values)
elapsed_time = time.time() - start_time
print(f'PE => Elapsed time {elapsed_time} seconds')
start_time = time.time()
self.policy, policy_stable = self.policyImprovement(self.policy,self.values, self.actions)
elapsed_time = time.time() - start_time
print(f'PI => Elapsed time {elapsed_time} seconds')
if policy_stable:
break
iterations += 1
total_elapsed_time = time.time() - total_start_time
print(f'Optimal policy is reached after {iterations} iterations in {total_elapsed_time} seconds')
return self.values, self.policy
但是我的两个实现都给出了不同的最优策略和最优值。我完全遵循了书中给出的算法。
策略迭代的结果:
values :
[[ 0. -0.33300781 -0.33300781 -0.33300781]
[-0.33300781 -0.33300781 -0.33300781 -0.33300781]
[-0.33300781 -0.33300781 -0.33300781 -0.33300781]
[-0.33300781 -0.33300781 -0.33300781 0. ]]
****************************************************************************************************
****************************************************************************************************
policy :
[[0 0 0 0]
[1 0 0 0]
[0 0 0 3]
[0 0 2 0]]
值迭代结果:
values :
[[ 0.0 -1.0 -2.0 -3.0]
[-1.0 -2.0 -3.0 -2.0]
[-2.0 -3.0 -2.0 -1.0]
[-3.0 -2.0 -1.0 0.0]]
****************************************************************************************************
****************************************************************************************************
[[0. 0. 0. 0.]
[1. 0. 0. 3.]
[1. 0. 2. 3.]
[1. 2. 2. 0.]]
此外,值迭代在 4 次迭代后收敛,而策略迭代在 2 次迭代后收敛。
我哪里做错了?他们能给出不同的最优政策吗?但我相信,在第三次迭代后,我们得到的值是最佳的。然后,我的代码的策略迭代一定存在一些我无法看到的问题。基本上,我应该如何修复策略?
我想问题出在这些行上:
(1( 价值 = self.action_prob * (奖励 + 自我折扣 * state_values[next_i, next_j]((2(
new_state_values[i, j] = 价值
在这里,您可以直接分配仅从一个操作中获得的值。如果你看一下贝尔曼期望方程,所有动作的开头都有一个总和。您必须考虑来自某个状态的所有操作,对所有可能的操作进行计算 (1(,将它们相加并将(2(中的此总和分配给 (i,j( 的新值。