飞桨强化学习从零实践(一):基于价值的方法
写在前面
不要重复造轮子,学会使用轮子。
本文源于百度AI平台飞桨学院《世界冠军带你从零实践强化学习》课程中我自己的心得和理解,感谢科科老师这几天精彩的讲解,带我们从小乌龟(Sarsa、Q-learning),到倒立摆(DQN,PG),再到四旋翼飞行器(DDPG),逐步领略强化学习的魅力。本文旨在介绍PARL框架的使用方法,并从模型的理解和代码的构建角度来整理所学内容,不求详尽但求简洁明了,看完都能学会Model
、Algorithm
、Agent
的构建方法。我认为强化学习中对算法每一个概念的理解很重要,你可以不懂公式的推导,但是只要你理解了算法框图中的每一个步骤,那你就能够灵活的应用PARL框架去解决自己的问题。为了让大家理解代码的模块化构建,这篇文章只介绍Sarsa、Q-learning和DQN,前两者只用了一个Agent
函数,后者用了PARL的Model
、Algorithm
、Agent
模块,对比两种构建方式的不同,我们就可以很轻松的举一反三,PG和DDPG同样也可以用这三大模块构建。
依赖库是python初学者永远的痛点,相信很多人都有网上查到代码,本地一运行各种报错的经历,本文在各代码之前列举了所需依赖库,希望大家自己研究代码时能一次跑通。所有课程官方代码和作业答案都已上传至我的GitHub,欢迎大家star一下,文章中有问题或疏漏的地方大家也可以直接查看源码。
开始之前我希望大家都具有python编程基础,一定程度上掌握Numpy的用法,这个库真的是很有用。没用过的可以参考我之前的文章(查看这里)。学习过程中也可以参考大神的文章,推荐Tiny Tony、Mr.郑先生_、hhhsy、叶强。
强化学习——从尝试到决策
每个人都是过去经验的总和。
初识强化学习
强化学习(Reinforcement learning,简称RL)是机器学习中的一个领域,强调如何基于环境而行动,以取得最大化的预期利益。核心思想是:智能体(agent)在环境(environment)中学习,根据环境的状态(state)或观测(observation),执行动作(action),并根据环境的反馈 (reward)来指导更好的动作。
作为机器学习三巨头之一,强化学习和监督学习以及非监督学习关系如下图。
监督学习关注的是认知,而强化学习关注的是决策。简单的说,前者学习经验,后者运用经验。同样都是一张小熊的图片,监督学习输出的是灰熊、熊猫还是老虎,强化学习输出的是装死、逃跑还是干一架。
强化学习的分类和方法
强化学习主要关注的是无模型的问题,在未知的环境中进行探索学习,在生活中有广泛的应用。
其探索方案有二:
- 基于价值的方法(Q函数)
给每个状态都赋予一个价值的概念,来代表这个状态是好还是坏,这是一个相对的概念,让智能体往价值最高的方向行进。基于价值是确定性的。 - 基于策略的方法(Policy)
制定出多个策略,策略里的每个动作都有一定的概率,并让每一条策略走到底,最后查看哪个策略是最优的。基于策略是随机性的。
PARL框架和GYM环境
- 强化学习经典环境库GYM将环境(Env)交互接口规范化为:重置环境
reset()
、交互step()
、渲染render()
。 - 强化学习框架库PARL将强化学习框架抽象为:
Model
、Algorithm
、Agent
三层,使得强化学习算法的实现和调试更方便和灵活。(前两者有神经网络才用得上)
Agent
的训练(Train)和测试(Test)过程直接上图展示如下。
本文所需全部依赖库代码如下,paddlepaddle默认使用CPU版本,可自行选用GPU版本,这里不再赘述。
# 可以直接 pip install -r requirements.txt
pip install paddlepaddle==1.6.3
pip install parl==1.3.1
pip install gym
pip install atari-py
pip install rlschool==0.3.1
基于表格型方法求解RL
没有什么比前人走过的路(Q)更有价值(V)。
序列决策的经典表达(MDP)
某一状态信息包含了所有相关的历史,只要当前状态可知,所有的历史信息都不再需要,当前状态就可以决定未来,则认为该状态具有马尔科夫性。马尔可夫决策过程(MDP)是序列决策的数学模型,它是一个无记忆的随机过程,可以用一个元组<S,P>表示,其中S是有限数量的状态集,P是状态转移概率矩阵。
强化学习中我们引入奖励R和动作A来描述环境,构成MDP五元组<S,A,P,R,>,其中P函数表示环境的随机性,R函数其实是P函数的一部分,表示获得的收益,是衰减因子以适当的减少对未来收益的考虑。
同样以熊问题为例,对应的是Model-free的情况,即P函数和R函数都未知的情况。这时我们用价值V代表某一状态的好坏,用Q函数来代表某个状态下哪个动作更好,即状态动作价值。
现实世界中,奖励R往往是延迟的,所以一般会从当前时间点开始,对后续可能得到的收益累加,以此来计算当前的价值。但是有时候目光不要放得太长远,对远一些的东西当作近视看不见就好。适当地引入一个衰减因子,再去计算未来的总收益,的值在0-1之间,时间点越久远,对当前的影响也就越小。
状态动作价值(Q)的求解
假设人走在树林里,先看到树上有熊爪后看到熊,接着就看到熊发怒了,经过很多次之后,原来要见到熊才瑟瑟发抖的,后来只要见到树上有熊爪就会有晕眩和害怕的感觉。也就是说,在不断地训练之后,下一个状态的价值可以不断地强化、影响上一个状态的价值。
这样的迭代状态价值的强化方式被称为时序差分(Temporal Difference)。单步求解Q函数,用来近似,以迭代的方式简化数学公式,最终使得逼近目标值。这里的目标值就是前面提到的未来收益之和。注意实际上是由当前计算出来的,因此。
Sarsa和Q-learning
Sarsa全称是state-action-reward-state’-action’,目的是学习特定的state下,特定action的价值Q,最终建立和优化一个Q表格,以state为行,action为列,根据与环境交互得到的reward来更新Q表格,更新公式即为上面的迭代公式。Sarsa在训练中为了更好的探索环境,采用ε-greedy方式(如下图)来训练,有一定概率随机选择动作输出。
Q-learning也是采用Q表格的方式存储Q值,探索部分与Sarsa是一样的,采用ε-greedy方式增加探索。
- Q-learning跟Sarsa不一样的地方是更新Q表格的方式,即learn()函数。
- Sarsa是on-policy,先做出动作再learn,Q-learning是off-policy,learn时无需获取下一步动作。
二者更新Q表格的方式分别为:
二者算法对比如下图所示,有三处不同点。
on-policy优化的是目标策略,用下一步一定会执行的动作来优化Q表格;off-policy实际上有两种不同的策略,期望得到的目标策略和大胆探索的行为策略,在目标策略的基础上用行为策略获得更多的经验。
代码构建与演示
import numpy as np
Sarsa Agent构建
class SarsaAgent(object):
def __init__(self,
obs_n,
act_n,
learning_rate=0.01,
gamma=0.9,
e_greed=0.1):
self.act_n = act_n # 动作维度,有几个动作可选
self.lr = learning_rate # 学习率
self.gamma = gamma # reward的衰减率
self.epsilon = e_greed # 按一定概率随机选动作
self.Q = np.zeros((obs_n, act_n)) # 构建Q表格
# 根据输入观察值,采样输出的动作值,带探索
def sample(self, obs):
if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作
action = self.predict(obs)
else:
action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作
return action
# 根据输入观察值,预测输出的动作值
def predict(self, obs):
Q_list = self.Q[obs, :]
maxQ = np.max(Q_list)
action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个action(where返回一个tuple,需要取[0])
action = np.random.choice(action_list) # 随机选择一个action
return action
# 学习方法,也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, next_action, done):
""" on-policy
obs: 交互前的obs, s_t
action: 本次交互选择的action, a_t
reward: 本次动作获得的奖励r
next_obs: 本次交互后的obs, s_t+1
next_action: 根据当前Q表格, 针对next_obs会选择的动作, a_t+1
done: episode是否结束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 没有下一个状态了
else:
target_Q = reward + self.gamma * self.Q[next_obs,
next_action] # Sarsa
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q
Q-learning Agent构建
class QLearningAgent(object):
def __init__(self,
obs_n,
act_n,
learning_rate=0.01,
gamma=0.9,
e_greed=0.1):
self.act_n = act_n # 动作维度,有几个动作可选
self.lr = learning_rate # 学习率
self.gamma = gamma # reward的衰减率
self.epsilon = e_greed # 按一定概率随机选动作
self.Q = np.zeros((obs_n, act_n))
# 根据输入观察值,采样输出的动作值,带探索
def sample(self, obs):
# 内容略,和sarsa一样
return action
# 根据输入观察值,预测输出的动作值
def predict(self, obs):
# 内容略,和sarsa一样
return action
# 学习方法,也就是更新Q-table的方法
def learn(self, obs, action, reward, next_obs, done):
""" off-policy
obs: 交互前的obs, s_t
action: 本次交互选择的action, a_t
reward: 本次动作获得的奖励r
next_obs: 本次交互后的obs, s_t+1
done: episode是否结束
"""
predict_Q = self.Q[obs, action]
if done:
target_Q = reward # 没有下一个状态了
else:
target_Q = reward + self.gamma * np.max(
self.Q[next_obs, :]) # Q-learning
self.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q
训练过程代码如下:
# train.py
import gym
from gridworld import CliffWalkingWapper, FrozenLakeWapper
from agent import SarsaAgent
import time
def run_episode(env, agent, render=False):
total_steps = 0 # 记录每个episode走了多少step
total_reward = 0
obs = env.reset() # 重置环境, 重新开一局(即开始新的一个episode)
action = agent.sample(obs) # 根据算法选择一个动作
while True:
next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互
next_action = agent.sample(next_obs) # 根据算法选择一个动作
# 训练 Sarsa 算法
agent.learn(obs, action, reward, next_obs, next_action, done)
action = next_action # 存储上一个动作
# 训练 q-learing 算法
# agent.learn(obs, action, reward, next_obs, done)
obs = next_obs # 存储上一个观察值
total_reward += reward
total_steps += 1 # 计算step数
if render:
env.render() #渲染新的一帧图形
if done:
break
return total_reward, total_steps
def test_episode(env, agent):
total_reward = 0
obs = env.reset()
while True:
action = agent.predict(obs) # greedy
next_obs, reward, done, _ = env.step(action)
total_reward += reward
obs = next_obs
time.sleep(0.5)
env.render()
if done:
print('test reward = %.1f' % (total_reward))
break
def main():
# env = gym.make("FrozenLake-v0", is_slippery=False) # 0 left, 1 down, 2 right, 3 up
# env = FrozenLakeWapper(env)
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left
env = CliffWalkingWapper(env)
agent = SarsaAgent(
obs_n=env.observation_space.n,
act_n=env.action_space.n,
learning_rate=0.1,
gamma=0.9,
e_greed=0.1) # 或 QLearningAgent
is_render = False
for episode in range(500):
ep_reward, ep_steps = run_episode(env, agent, is_render)
print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps,
ep_reward))
# 每隔20个episode渲染一下看看效果
if episode % 20 == 0:
is_render = True
else:
is_render = False
# 训练结束,查看算法效果
test_episode(env, agent)
if __name__ == "__main__":
main()
可以看到二者区别全在learn
函数,训练过程的差别也仅在learn
的输入是否有next_action
。大家可以尝试用命令行运行以下演示代码(这里下载),训练环境是悬崖问题(CliffWalking)。作业中的冰湖问题(FrozenLake)见.\tutorials\homework
文件夹。
# sarsa 演示
cd .\tutorials\lesson2\sarsa
python .\train.py
# q-learing 演示
cd .\tutorials\lesson2\q_learning
python .\train.py
最终结果如下面的GIF所示。可以看出来Sarsa比较保守,会选择下限最高的路线,即尽可能远离悬崖的路线,虽然路程远了,但至少掉下悬崖的风险就小了很多;而Q-learning比较激进,会选择上限最高的路线,即路程最近的路线,但存在些微扰动便可能掉下悬崖。
Sarsa训练图:
Q-learning训练图:
基于神经网络方法求解RL
选择(A)有限,而人生(S)无限。
神经网络近似Q函数
前面提到的悬崖问题,状态(S)总量很少,但实际生活中,很多常见问题的状态都是数量庞大的,如象棋、围棋等。即使用Q表格装下所有状态,表格可能占用极大内存,表格的查找也相当费时。我们就可以用带参数的Q函数来近似Q表格,比如可以用多项式函数或者神经网络,优势是只需要输出少量参数,同时能实现状态泛化。
神经网络结构如下图所示。
- 神经元:神经网络中每个节点称为神经元,由两部分组成:
- 加权和:将所有输入加权求和。
- 非线性变换(激活函数):加权和的结果经过一个非线性函数变换,让神经元计算具备非线性的能力。
- 多层连接: 大量这样的节点按照不同的层次排布,形成多层的结构连接起来,即称为神经网络。
- 前向计算: 从输入计算输出的过程,顺序从网络前至后。
- 计算图: 以图形化的方式展现神经网络的计算逻辑又称为计算图。我们也可以将神经网络的计算图以公式的方式表达为
由此可见,神经网络的本质是一个含有很多参数的“大公式”。
DQN:入门deep RL
DQN本质上是一个Q-learning算法,但使用神经网络来近似替代Q表格。
类比监督学习的训练,DQN的训练过程也非常地类似,它输入的是一批状态(S),输出的是对应的Q。计算输出的Q与Target Q的均方差,进行优化,以此更新神经网络参数。
在Q-learning的基础上,DQN提出了两个技巧使得Q网络的更新迭代更稳定:
- 经验回放 (Experience Replay):主要解决样本关联性和利用效率的问题。使用一个经验池(buffer)存储多条经验(s,a,r,s’),再从中随机抽取一批(batch)数据送去训练。
- 固定Q目标 (Fixed-Q-Target):主要解决算法训练不稳定的问题。每隔一段时间复制一次网络参数(w),保证用于计算Target Q的Q函数不变。
代码构建与演示
这部分代码构建展示了PARL框架的基本用法,即Model
、Algorithm
、Agent
嵌套。Algorithm
是Agent
的一部分,Model
又是Algorithm
的一部分,相比较前文中只有一个Agent
的简单代码模块性更好,但理解起来也更复杂,容我慢慢道来。
import parl
from parl import layers
import paddle.fluid as fluid
import copy
import numpy as np
Model
用来定义前向(Forward
)网络,用户可以自由的定制自己的神经网络结构。
class Model(parl.Model):
def __init__(self, act_dim):
hid1_size = 128
hid2_size = 128
# 3层全连接(fc)网络,act选择激活函数,不使用激活函数即线性
self.fc1 = layers.fc(size=hid1_size, act='relu')
self.fc2 = layers.fc(size=hid2_size, act='relu')
self.fc3 = layers.fc(size=act_dim, act=None)
def value(self, obs):
# 前向计算 Q实际上是一个二维数组
# 输入state,输出所有action对应的Q,[Q(s,a1), Q(s,a2), Q(s,a3)...]
h1 = self.fc1(obs)
h2 = self.fc2(h1)
Q = self.fc3(h2)
return Q
Algorithm
定义了具体的算法来更新前向网络(Model
),也就是通过定义损失函数来更新Model
,和算法相关的计算都放在algorithm
中。
# from parl.algorithms import DQN # 也可以直接从parl库中导入DQN算法
class DQN(parl.Algorithm):
def __init__(self, model, act_dim=None, gamma=None, lr=None):
""" DQN algorithm
Args:
model (parl.Model): 定义Q函数的前向网络结构
act_dim (int): action空间的维度,即有几个action
gamma (float): reward的衰减因子
lr (float): learning rate 学习率.
"""
self.model = model
self.target_model = copy.deepcopy(model)
# 参数的类型要求,且必须有输入
assert isinstance(act_dim, int)
assert isinstance(gamma, float)
assert isinstance(lr, float)
self.act_dim = act_dim
self.gamma = gamma
self.lr = lr
def predict(self, obs):
""" 使用self.model的value网络来获取 [Q(s,a1),Q(s,a2),...]
"""
return self.model.value(obs)
def learn(self, obs, action, reward, next_obs, terminal):
""" 使用DQN算法更新self.model的value网络
"""
# 从target_model中获取 max Q' 的值,用于计算target_Q
next_pred_value = self.target_model.value(next_obs)
# dim=0指最外边的[],dim增加向内数一个[]
# 这里value都自带两个[],所以dim=1,计算里面那个[]
best_v = layers.reduce_max(next_pred_value, dim=1)
# 阻止梯度传递保证target不变,否则paddle的优化器默认迭代一步梯度
best_v.stop_gradient = True
# 判断是否是最后一条经验
terminal = layers.cast(terminal, dtype='float32')
target = reward + (1.0 - terminal) * self.gamma * best_v
pred_value = self.model.value(obs) # 获取Q预测值
# 将action转onehot向量,比如:3 => [0,0,0,1,0]
action_onehot = layers.one_hot(action, self.act_dim)
action_onehot = layers.cast(action_onehot, dtype='float32')
# 下面一行是逐元素相乘,拿到action对应的 Q(s,a)
# 比如:pred_value = [[2.3, 5.7, 1.2, 3.9, 1.4]], action_onehot = [[0,0,0,1,0]]
# ==> pred_action_value = [[3.9]]
pred_action_value = layers.reduce_sum(
layers.elementwise_mul(action_onehot, pred_value), dim=1)
# 计算 Q(s,a) 与 target_Q的均方差,得到loss
cost = layers.square_error_cost(pred_action_value, target)
cost = layers.reduce_mean(cost)
optimizer = fluid.optimizer.Adam(learning_rate=self.lr) # 使用Adam优化器
optimizer.minimize(cost)
return cost
def sync_target(self):
""" 把 self.model 的模型参数值同步到 self.target_model
"""
# 同步参数就行,比直接copy效率高
self.model.sync_weights_to(self.target_model)
Agent
负责算法与环境的交互,在交互过程中把生成的数据提供给Algorithm
来更新模型(Model
),数据的预处理流程也一般定义在这里。
class Agent(parl.Agent):
def __init__(self,
algorithm,
obs_dim,
act_dim,
e_greed=0.1,
e_greed_decrement=0):
assert isinstance(obs_dim, int)
assert isinstance(act_dim, int)
self.obs_dim = obs_dim
self.act_dim = act_dim
# 将Algorithm传入Agent,即self.alg
super(Agent, self).__init__(algorithm)
self.global_step = 0
self.update_target_steps = 200
# 每隔200个training steps再把model的参数复制到target_model中
self.e_greed = e_greed # 有一定概率随机选取动作,探索
self.e_greed_decrement = e_greed_decrement # 随着训练逐步收敛,探索的程度慢慢降低
def build_program(self):
# 这一部分是搭建predict和learn框架,给定数据大小和类型
# 至于输入实际数据的代码,在后面的predict和learn函数
self.pred_program = fluid.Program()
self.learn_program = fluid.Program()
with fluid.program_guard(self.pred_program):
# 搭建计算图用于 预测动作,定义输入输出变量
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
self.value = self.alg.predict(obs)
with fluid.program_guard(self.learn_program):
# 搭建计算图用于 更新Q网络,定义输入输出变量
obs = layers.data(
name='obs', shape=[self.obs_dim], dtype='float32')
action = layers.data(name='act', shape=[1], dtype='int32')
reward = layers.data(name='reward', shape=[], dtype='float32')
next_obs = layers.data(
name='next_obs', shape=[self.obs_dim], dtype='float32')
terminal = layers.data(name='terminal', shape=[], dtype='bool')
self.cost = self.alg.learn(obs, action, reward, next_obs, terminal)
def sample(self, obs):
sample = np.random.rand() # 产生0~1之间的小数
if sample < self.e_greed:
act = np.random.randint(self.act_dim) # 探索:每个动作都有概率被选择
else:
act = self.predict(obs) # 选择最优动作
# 前面和sarsa基本一样,不同的是随着训练逐步收敛,探索的程度慢慢降低
self.e_greed = max(
0.01, self.e_greed - self.e_greed_decrement)
return act
def predict(self, obs): # 选择最优动作,和sample的else部分一致
# 扩展维度,因为obs是[...],但是Q是[[...]]
obs = np.expand_dims(obs, axis=0)
pred_Q = self.fluid_executor.run(
self.pred_program,
feed={'obs': obs.astype('float32')},
fetch_list=[self.value])[0]
# 降低维度,理由同上
pred_Q = np.squeeze(pred_Q, axis=0) # 若axis=0的维度len==1则移除
act = np.argmax(pred_Q) # 选择Q最大的下标,即对应的动作
return act
def learn(self, obs, act, reward, next_obs, terminal):
# 每隔200个training steps同步一次model和target_model的参数
if self.global_step % self.update_target_steps == 0:
self.alg.sync_target()
self.global_step += 1
act = np.expand_dims(act, -1)
feed = {
'obs': obs.astype('float32'),
'act': act.astype('int32'),
'reward': reward,
'next_obs': next_obs.astype('float32'),
'terminal': terminal
}
cost = self.fluid_executor.run(
self.learn_program, feed=feed, fetch_list=[self.cost])[0] # 训练一次网络
return cost
- 经验池
replay_memory
:用于存储多条经验,实现经验回放。
# replay_memory.py
import random
import collections
import numpy as np
class ReplayMemory(object):
def __init__(self, max_size):
self.buffer = collections.deque(maxlen=max_size)
# 增加一条经验到经验池中
def append(self, exp):
self.buffer.append(exp)
# 从经验池中选取N条经验出来
def sample(self, batch_size):
mini_batch = random.sample(self.buffer, batch_size)
obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []
for experience in mini_batch:
s, a, r, s_p, done = experience
obs_batch.append(s)
action_batch.append(a)
reward_batch.append(r)
next_obs_batch.append(s_p)
done_batch.append(done)
return np.array(obs_batch).astype('float32'), \
np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\
np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')
def __len__(self):
return len(self.buffer)
训练过程代码如下:
# train.py
import os
import gym
import numpy as np
import parl
from parl.utils import logger # 日志打印工具
from model import Model
from algorithm import DQN # from parl.algorithms import DQN # parl >= 1.3.1
from agent import Agent
from replay_memory import ReplayMemory
LEARN_FREQ = 5 # 训练频率,不需要每一个step都learn,攒一些新增经验后再learn,提高效率
MEMORY_SIZE = 20000 # replay memory的大小,越大越占用内存
MEMORY_WARMUP_SIZE = 200 # replay_memory 里需要预存一些经验数据,再从里面sample一个batch的经验让agent去learn
BATCH_SIZE = 32 # 每次给agent learn的数据数量,从replay memory随机里sample一批数据出来
LEARNING_RATE = 0.001 # 学习率
GAMMA = 0.99 # reward 的衰减因子,一般取 0.9 到 0.999 不等
# 训练一个episode
def run_episode(env, agent, rpm):
total_reward = 0
obs = env.reset()
step = 0
while True:
step += 1
action = agent.sample(obs) # 采样动作,所有动作都有概率被尝试到
next_obs, reward, done, _ = env.step(action)
rpm.append((obs, action, reward, next_obs, done))
# train model 每次learn都随机抽样
if (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0):
(batch_obs, batch_action, batch_reward, batch_next_obs,
batch_done) = rpm.sample(BATCH_SIZE)
train_loss = agent.learn(batch_obs, batch_action, batch_reward,
batch_next_obs,
batch_done) # s,a,r,s',done
total_reward += reward
obs = next_obs
if done:
break
return total_reward
# 评估 agent, 跑 5 个episode,总reward求平均
def evaluate(env, agent, render=False):
eval_reward = []
for i in range(5):
obs = env.reset()
episode_reward = 0
while True:
action = agent.predict(obs) # 预测动作,只选最优动作
obs, reward, done, _ = env.step(action)
episode_reward += reward
if render:
env.render()
if done:
break
eval_reward.append(episode_reward)
return np.mean(eval_reward)
def main():
env = gym.make(
'CartPole-v0'
) # CartPole-v0: expected reward > 180 MountainCar-v0 : expected reward > -120
action_dim = env.action_space.n # CartPole-v0: 2
obs_shape = env.observation_space.shape # CartPole-v0: (4,)
rpm = ReplayMemory(MEMORY_SIZE) # DQN的经验回放池
# 根据parl框架构建agent
model = Model(act_dim=action_dim)
algorithm = DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE)
agent = Agent(
algorithm,
obs_dim=obs_shape[0],
act_dim=action_dim,
e_greed=0.1, # 有一定概率随机选取动作,探索
e_greed_decrement=1e-6) # 随着训练逐步收敛,探索的程度慢慢降低
# 加载模型
# save_path = './dqn_model.ckpt'
# agent.restore(save_path)
# 先往经验池里存一些数据,避免最开始训练的时候样本丰富度不够
while len(rpm) < MEMORY_WARMUP_SIZE:
run_episode(env, agent, rpm)
max_episode = 2000
# start train
episode = 0
while episode < max_episode: # 训练max_episode个回合,test部分不计算入episode数量
# train part
for i in range(0, 50):
total_reward = run_episode(env, agent, rpm)
episode += 1
# test part
eval_reward = evaluate(env, agent, render=True) # render=True 查看显示效果
logger.info('episode:{} e_greed:{} Test reward:{}'.format(
episode, agent.e_greed, eval_reward))
# 训练结束,保存模型
save_path = './dqn_model.ckpt'
agent.save(save_path)
if __name__ == '__main__':
main()
大家可以尝试用命令行运行以下演示代码(这里下载),训练环境是倒立摆问题(CartPole),倒立摆可以说是强化学习中的hello world,入门必备。作业中的小车上山问题(MountainCar)见.\tutorials\homework
文件夹。
# DQN 演示
cd .\tutorials\lesson3\dqn
python .\train.py
cd .\tutorials\homework\lesson3\dqn_mountaincar
python .\train.py
最终DQN结果如下面的GIF所示。
倒立摆(CartPole):
小车上山(MountainCar):
总结
框架库PARL将强化学习框架抽象为: Model
、Algorithm
、Agent
三层,使得强化学习算法的实现和调试更方便和灵活。前两者尤其针对deep RL的情况,有神经网络时直接调用paddle的api可以很方便的进行网络和算法的构建。本文由强化学习经典算法Sarsa和Q-learning扩展到deep RL的DQN算法,并以DQN为例讲解了PARL的使用方法。下篇文章我会讲基于policy的PG算法和用于连续状态控制的DDPG,敬请期待。