tl dr版本:为什么我执行的前两个操作/观察与回放缓冲区中的前两次对象不一致?
tf代理重播缓冲区是否会自动打乱数据?
通过添加这些打印,我可以看到我的前两个步骤是什么样子的
print("just addding this as traj num = "+str(num))
print(" next time step = "+str(next_time_step))
replay_buffer.add_batch(traj)
这会产生
just addding this as traj num = 0
next time step = TimeStep(
{'discount': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>,
'observation': <tf.Tensor: shape=(1, 1, 5, 5), dtype=float32, numpy=
array([[[[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 1., 0.]]]], dtype=float32)>,
'reward': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-0.05], dtype=float32)>,
'step_type': <tf.Tensor: shape=(1,), dtype=int32, numpy=array([2])>})
just addding this as traj num = 1
next time step = TimeStep(
{'discount': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([0.], dtype=float32)>,
'observation': <tf.Tensor: shape=(1, 1, 5, 5), dtype=float32, numpy=
array([[[[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 1., 1., 0.]]]], dtype=float32)>,
'reward': <tf.Tensor: shape=(1,), dtype=float32, numpy=array([-0.05], dtype=float32)>,
'step_type': <tf.Tensor: shape=(1,), dtype=int32, numpy=array([2])>})
几行之后,当我将数据集设置为迭代器时,我再次显式地打印第一个数据点。(我已经将我的批量大小设置为3,所以我们应该得到前3个结果,我们似乎得到了第一个结果的3个副本(
Trajectory(
{'action': <tf.Tensor: shape=(3, 1), dtype=int32, numpy=
array([[3],
[0],
[0]])>,
'discount': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[0.],
[0.],
[0.]], dtype=float32)>,
'next_step_type': <tf.Tensor: shape=(3, 1), dtype=int32, numpy=
array([[2],
[2],
[2]])>,
'observation': <tf.Tensor: shape=(3, 1, 1, 5, 5), dtype=float32, numpy=
array([[[[[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.]]]],
[[[[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.]]]],
[[[[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.],
[0., 0., 0., 0., 0.]]]]], dtype=float32)>,
'policy_info': (),
'reward': <tf.Tensor: shape=(3, 1), dtype=float32, numpy=
array([[-1. ],
[-0.05],
[ 1. ]], dtype=float32)>,
'step_type': <tf.Tensor: shape=(3, 1), dtype=int32, numpy=
array([[0],
[0],
[0]])>})
### experience 1 above
### experience 1 above
### experience 1 above
这些经历是空白的。如果我们继续迭代,我们将继续得到相同的结果。
这是怎么回事?如何将数据以与收集数据相同的顺序保存在回放缓冲区中?
########################
下面的可复制示例(游戏的大部分内容都被删除以使其更小(
import tensorflow as tf
from tf_agents.networks import q_network
from tf_agents.agents.dqn import dqn_agent
import tf_agents
import tf_agents.environments.py_environment as PyEnvironment
from tf_agents.trajectories import time_step as ts
import numpy as np
import keras
import tf_agents.policies.random_tf_policy as random_tf_policy
import tf_agents.environments as tf_py_environment
import numpy as np
import random
import copy
class simple_slots():
def __init__(self, x, y):
self.x_rows = x
self.y_rows = y
self.slots = []
for i in range(x):
ys=[]
for j in range(y):
ys.append(0)
self.slots.append(ys)
def new_game(self):
for xs in self.slots:
for slot in xs:
slot = 0
def find_lowest_slot(self , x):
lowest_y = 0
best_slot = "none"
for slot, y_ind in zip(reversed(self.slots[x]), reversed(range(len(self.slots[x])))):
if slot == 0:
if y_ind > lowest_y:
lowest_y = y_ind
best_slot = slot
if best_slot != "none":
return lowest_y
return False
def ml_plays_turn(self, action):
y = self.find_lowest_slot(action)
self.slots[action][y] = 1
def script_plays_turn(self, action = 5):
y = self.find_lowest_slot(action)
self.slots[action][y] = 2
def arbirtrarily_decide_if_game_over(self):
if random.random() < 0.2:
reward = 1
elif random.random() < 0.5:
reward = -1
else:
reward = 0
return reward
class Con4Env(PyEnvironment.PyEnvironment):
def __init__(self, game):
self.game = game
self._action_spec = tf_agents.specs.BoundedArraySpec(
shape=(), dtype=np.int32, minimum=0, maximum=game.x_rows-1 , name='action')
self._observation_spec = tf_agents.specs.BoundedArraySpec(
shape=(1, game.x_rows,game.y_rows), dtype=np.float32, minimum=0, name='observation')
self._state = np.zeros((game.x_rows,game.y_rows) , dtype=np.float32)
self._time_step_spec = ts.time_step_spec(self._observation_spec)
self._episode_ended = False
def action_spec(self):
return self._action_spec
def observation_spec(self):
return self._observation_spec
def _reset(self):
self._state = np.zeros((game.x_rows,game.y_rows) , dtype=np.float32)
self._episode_ended = False
return ts.restart(np.array([self._state], dtype=np.float32))
def copy_gameboard_to_state(self):
for ys, yind in zip(self.game.slots, range(len(self.game.slots))):
for x , xind in zip(ys, range(len(ys))):
self._state[xind][yind] = x
def _step(self, action):
if self._episode_ended:
return self.reset()
reward = self.game.arbirtrarily_decide_if_game_over()
if reward != 0:
self._episode_ended = True
elif self.game.ml_plays_turn(action):
self.game.script_plays_turn()
self.copy_gameboard_to_state()
else:
reward = -0.05 #### column full, call it draw
self._episode_ended = True
if self._episode_ended: #### if game was ended last round the reward then we go in here 1 last time
self.game.new_game()
self.copy_gameboard_to_state()
return ts.termination(np.array([self._state], dtype=np.float32), reward)
else:
self.copy_gameboard_to_state()
return ts.transition(np.array([self._state], dtype=np.float32), reward=0.0, discount=0.0)
game = simple_slots(5,5)
the_env = Con4Env(game)
eval_env = Con4Env(game)
the_env = tf_py_environment.TFPyEnvironment(the_env)
eval_env = tf_py_environment.TFPyEnvironment(eval_env)
# create time_step_spec
from tf_agents.utils import common
step_type_spec = tf.TensorSpec(shape=(), dtype=tf.dtypes.int32, name='step_type') # just declare a time step spec
reward_spec= tf.TensorSpec(shape=(), dtype=tf.dtypes.float32, name='reward_spec')
discount_spec= tf.TensorSpec(shape=(), dtype=tf.dtypes.float32, name='discount_spec')
time_step_spec = tf_agents.trajectories.TimeStep( step_type_spec ,reward_spec, discount_spec, the_env.observation_spec() )
#####################################################################
q_net = tf_agents.networks.q_network.QNetwork(
input_tensor_spec = the_env.observation_spec(),
action_spec = the_env.action_spec(),
preprocessing_layers=None,
preprocessing_combiner=None,
conv_layer_params=None,
fc_layer_params=(75, 40),
dropout_layer_params=None,
activation_fn=tf.keras.activations.relu,
kernel_initializer=None,
batch_squash=True,
dtype=tf.float32,
q_layer_activation_fn=None,
name='QNetwork'
)
train_step_counter = tf.Variable(0)
gamma = 0.99
min_q_value = -20
max_q_value = 20
n_step_update = 2
agent = dqn_agent.DqnAgent(
time_step_spec ,
the_env.action_spec() ,
q_net,
optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=0.000001),
n_step_update=n_step_update,
td_errors_loss_fn=common.element_wise_squared_loss,
gamma=gamma,
train_step_counter=train_step_counter
)
random_policy = random_tf_policy.RandomTFPolicy(time_step_spec, the_env.action_spec())
# data collector
data_spec=agent.collect_data_spec
print(data_spec)
from tf_agents.utils import common
import copy
replay_buffer_capacity = 999
initial_collect_steps = 50
batch_size = 3
n_step_update = 1
num_parallel_calls = 2
replay_buffer = tf_agents.replay_buffers.TFUniformReplayBuffer(
data_spec=agent.collect_data_spec,
batch_size=the_env.batch_size,
max_length=replay_buffer_capacity
)
def collect_step(environment, policy, num):
if environment.current_time_step().is_last():
time_step = environment.reset()
else:
time_step = environment.current_time_step()
action_step = policy.action(time_step)
next_time_step = environment.step(action_step.action)
traj = tf_agents.trajectories.from_transition(time_step, action_step, next_time_step)
print("just addding this as traj num = "+str(num))
print(" next time step = "+str(next_time_step))
replay_buffer.add_batch(traj)
nom = 0
for _ in range(initial_collect_steps):
collect_step(the_env, random_policy , nom)
nom+=1
# after each step check to see if data is in order
dataset = replay_buffer.as_dataset(
num_parallel_calls=num_parallel_calls,
sample_batch_size=batch_size,
num_steps=n_step_update).prefetch(9)
iterator = iter(dataset)
experience, unused_info = next(iterator)
print(experience)#### why is this thing out of order
for i in range(3):
print("### experience 1 above")
experience, unused_info = next(iterator)
print(experience)#### why is this thing out of order
for i in range(3):
print("### experience 2 above")
那些仍在寻找的人反应迟缓。
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
data_spec,
batch_size=batch_size,
max_length=max_length)
dataset = replay_buffer.as_dataset(sample_batch_size=32, num_steps=2, num_parallel_calls=1, single_deterministic_pass=True)
iterator = iter(dataset)
sample = next(iterator)
关于代码的说明:
.as_dataset()
调用中的- num_step告诉如何按顺序拉动行。这对于同时依赖当前步骤和下一步骤的模型很有帮助。(由于
n_step_update=1
,因此不适用于您的情况(,但为了通用性而提及。这种行为不一定是直观的
考虑批量大小为32。
replay_buffer.add_batch(my_batch_32(replay_buffer.add_batch(my_new_batch_32(
num_steps将从第一批中提取第一个元素,从第二批中提取第二个elment。
以下是我的笔记,详细介绍了这个过程以及如何使用它
def get_buffer():
data_spec = (
tf.TensorSpec([3], tf.float32, 'action')
)
batch_size = 32
max_length = 1000
replay_buffer = tf_uniform_replay_buffer.TFUniformReplayBuffer(
data_spec,
batch_size=batch_size,
max_length=max_length)
for i in tf.range(1,2, dtype=np.float32):
action = tf.cast(i,dtype=np.float32) * np.ones(
data_spec.shape.as_list(), dtype=np.float32)
values = (action)
values_batched = tf.nest.map_structure(lambda t: tf.stack([t] * batch_size),
values)
print(values_batched)
replay_buffer.add_batch(values_batched)
return replay_buffer
replay_buffer = get_buffer()
dataset = replay_buffer.as_dataset(sample_batch_size=32, num_steps=2, num_parallel_calls=1, single_deterministic_pass=True)
sampler = iter(dataset)
# note because dataset num_steps=2, this will generate a sample of size (sample_batch_size, num_steps, size_action_space)
# this means if batch 1 is all tf.ones() and batch 2 is 2* tf.ones(), the first element with be (1,2), NOT (1,1). ie you will get 32 (1,2) records
sample, _ = next(sampler)
"""--------------------------------------------------------------------------------------------------------
Observations of Replay Buffer use:
- Replay buffer batch size tells how many items should be added at a single time to the buffer
-If you don't add that many during an add_batch() call, it fails
- Dataset single_Deterministic_pass can be used to get a random sample if False, or a deterministic sample if True
- as_dataset()
- Batch size tells how many to records to pull. It does not have to be the same as replay_buffer batch size
- num_steps tells how many consecutive rows to combine into a single "row". Some loss functions (SAC) require two
which pertain to the current and next time steps
- next(iter(dataset)) gets the first value from a batch, then the next from the following batch when considering num_steps.
-This implies that a batch should be thought of as the SAME time step?
- when sampling with deterministic=False, it can sample the same element multiple times.
- next() throws an OutOfRangeError()
--------------------------------------------------------------------------------------------------------"""