1、深度强化学习(一): Deep Q Network(DQN)(两个网络的区别) 2、深度强化学习——DQN(工作流程图、误差计算方法)
DQN是一种融合了神经网络和 Q learning 的方法,因为传统表格形式的强化学习有这样一个瓶颈:当问题过于复杂,状态过多时,全用表格来存储它们是不现实的。 使用神经网络,我们就可以将状态和动作当成神经网络的输入, 然后经过神经网络分析后得到动作的 Q 值,这样我们就没必要在表格中记录 Q 值,而是直接使用神经网络生成 Q 值。 还有一种形式的是这样,我们也可以只输入状态值,输出所有的动作值,然后按照 Q learning 的原则,直接选择拥有最大值的动作当做下一步要做的动作。
1.初始化记忆库D和容量N,初始化Q的估计值、真实值等参数 2.在每一轮训练中,首先初始化状态序列 和 预处理序列,并在训练中的每一步进行如下操作: (1)根据贪婪度epsilon概率选择下一步的动作action (2)在当前状态state采用行动action后,得到下一状态state’和反馈reward (3)保存这一步的记忆到记忆库D中 (4)当积累一定的步数记忆后,从记忆库D中随机抽取一定数量的记忆作为样本并进行学习,以及更新网络参数 (5)将本次训练的将下一个 state_ 作为 下次循环训练的起始 state
在DQN中,搭建两个神经网络: 其中,target_net 用于预测 q_target 值(Q真实),他不会及时更新参数。它是 eval_net 的一个历史版本,拥有 eval_net 很久之前的一组参数,而且这组参数被固定一段时间,然后再被 eval_net 的新参数所替换。 eval_net 用于预测 q_eval(Q预测),这个神经网络拥有最新的神经网络参数,之后定期使用eval_net更新target_net。
不过这两个神经网络结构是完全一样的,只是里面的参数不一样,如下图: 1.初始化参数
self.n_actions = n_actions self.n_features = n_features self.lr = learning_rate self.gamma = reward_decay self.epsilon_max = e_greedy self.replace_target_iter = replace_target_iter # 每隔replace_target_iter步,更新 target_net self.memory_size = memory_size # 记忆上限 self.batch_size = batch_size # 每次更新时从 memory 里面取多少记忆出来 self.epsilon_increment = e_greedy_increment # epsilon 的增量 self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max # 是否开启探索模式, 并逐步减少探索次数 # 记录学习次数 (用于判断是否更换 target_net 参数) self.learn_step_counter = 0 # 初始化全 0 记忆 [s, a, r, s_] self.memory = np.zeros((self.memory_size, n_features*2+2))2.建立 evaluate_net
# 用来接收 observation (即state) self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s') # 用来接收 q_target 的值 self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target') with tf.variable_scope('eval_net'): # 配置神经网络各层参数 # c_names(collections_names) 是在更新 target_net 参数时会用到 # 此处即 eval_net_params 中储存所有的 eval_net 的参数 c_names, n_l1, w_initializer, b_initializer = \ ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES], 10, tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1) # eval_net 的第一层. collections 是在更新 target_net 参数时会用到 with tf.variable_scope('l1'): w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names) b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names) l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1) # eval_net 的第二层. collections 是在更新 target_net 参数时会用到 with tf.variable_scope('l2'): w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names) b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names) self.q_eval = tf.matmul(l1, w2) + b2 # 误差,训练 with tf.variable_scope('loss'): self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval)) with tf.variable_scope('train'): self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)3.建立target_net
# 接收下个 observation self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_') with tf.variable_scope('target_net'): # 此处即 target_net_params 中储存所有的 target_net 的参数 c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES] with tf.variable_scope('l1'): w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names) b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names) l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1) with tf.variable_scope('l2'): w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names) b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names) self.q_next = tf.matmul(l1, w2) + b24.存储训练中每一步的记忆
def store_transition(self, s, a, r, s_): if not hasattr(self, 'memory_counter'): self.memory_counter = 0 # 记录记忆库最新一条的索引 # np.hstack(): 按水平方向(列顺序)堆叠数组构成一个新的数组 transition = np.hstack((s, [a, r], s_)) # 记录一条 [s, a, r, s_] 记录 # 总 memory 大小是固定的, 如果超出总大小, 旧 memory 就被新 memory 替换 index = self.memory_counter % self.memory_size self.memory[index, :] = transition # 替换过程 self.memory_counter += 15.选择行为action
def choose_action(self, observation): # 统一 observation 的 shape (1, size_of_observation) observation = observation[np.newaxis, :] if np.random.uniform() < self.epsilon: # 让 eval_net 神经网络生成所有 action 的值, 并选择值最大的 action actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation}) action = np.argmax(actions_value) else: action = np.random.randint(0, self.n_actions) return action6.学习 q_next, q_eval 包含所有 action 的值,而我们需要的只是已经选择好的 action 的值,其他的并不需要。所以我们将其他的 action 值全变成 0,将用到的 action 误差值 反向传递回去,作为更新凭据。 比如 q_target - q_eval = [1, 0, 0] - [-1, 0, 0] = [2, 0, 0]。 其中,q_eval = [-1, 0, 0] 表示这一个记忆中有我选用过 action 0,而 action 0 带来的 Q(s, a0) = -1,所以其他的 Q(s, a1) = Q(s, a2) = 0。 而 q_target = [1, 0, 0] 表示这个记忆中的 r+gamma*maxQ(s_) = 1,而且不管在 s_ 上我们取了哪个 action,我们都需要对应上 q_eval 中的 action 位置,所以就将 1 放在了 action 0 的位置。
计算方法: 为了更方面让程序运算,将 q_eval 全部赋值给 q_target,这时 q_target - q_eval 全为 0。 然后,再根据 batch_memory 当中的 action 这个 column 来给 q_target 中的对应的 memory-action 位置来修改赋值,使新的赋值为 reward + gamma * maxQ(s_),这样 q_target - q_eval 就可以变成我们所需的样子。
举例: 假如在这个 batch 中,我们有2个提取的记忆,根据每个记忆可以生产3个 action 的值: q_eval = [[1, 2, 3], [4, 5, 6]] 设置: q_target = q_eval = [[1, 2, 3], [4, 5, 6]]
然后根据 memory 当中的具体 action 位置,来修改 q_target 对应 action 上的值: 比如在: 记忆 0 的 q_target 计算值是 -1, 而且我用了 action 0; 记忆 1 的 q_target 计算值是 -2, 而且我用了 action 2: 则 q_target = [[-1, 2, 3], [4, 5, -2]]
所以 (q_target - q_eval) 就变成了: [[(-1)-(1), 0, 0], [0, 0, (-2)-(6)]]
最后我们将这个 (q_target - q_eval) 当成误差,反向传递会神经网络。 其中,所有为 0 的 action 值是当时没有选择的 action,之前有选择的 action 才有不为0的值。而我们只反向传递之前选择的 action 的值。
def learn(self): # 检查是否替换 target_net 参数 if self.learn_step_counter % self.replace_target_iter == 0: self.sess.run(self.replace_target_op) print('\ntarget_params_replaced\n') # 从 memory 中随机抽取 batch_size 这么多记忆 if self.memory_counter > self.memory_size: sample_index = np.random.choice(self.memory_size, size=self.batch_size) else: sample_index = np.random.choice(self.memory_counter, size=self.batch_size) batch_memory = self.memory[sample_index, :] # 获取 q_next (target_net 产生了 q) 和 q_eval(eval_net 产生的 q) q_next, q_eval = self.sess.run( [self.q_next, self.q_eval], feed_dict={ self.s_: batch_memory[:, -self.n_features:], self.s: batch_memory[:, :self.n_features] }) # 重要步骤 q_target = q_eval.copy() batch_index = np.arange(self.batch_size, dtype=np.int32) eval_act_index = batch_memory[:, self.n_features].astype(int) reward = batch_memory[:, self.n_features + 1] q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1) # 训练 eval_net _, self.cost = self.sess.run([self._train_op, self.loss], feed_dict={self.s: batch_memory[:, :self.n_features], self.q_target: q_target} ) self.cost_his.append(self.cost) # 记录 cost 误差 # 逐渐增加 epsilon, 降低行为的随机性 self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max self.learn_step_counter += 17.误差曲线
def plot_cost(self): import matplotlib.pyplot as plt plt.plot(np.arange(len(self.cost_his)), self.cost_his) plt.ylabel('Cost') plt.xlabel('training steps') plt.show()