SAC最大化Q函数的同时,最大化Q函数的熵
Soft Actor-Critic(SAC)算法是一种先进的强化学习算法,它结合了策略梯度和价值函数估计,特别适用于连续动作空间的问题。SAC算法通过引入熵正则化和软化策略更新,增强了智能体的探索能力,并提高了学习效率。
SAC算法的核心原理包括策略优化、价值函数学习和熵优化。它使用策略梯度方法进行优化,并通过最小化策略的熵来增强探索性,同时引入自适应温度参数 αα 来平衡探索与利用。
算法的更新规则涉及多个组件,包括策略网络、两个Q网络以及价值网络。SAC算法利用重参数技巧(reparameterization trick)来稳定策略的输出,并采用两个Q网络的最小值作为目标,减少过高估计Q值的风险。
SAC算法的两种变体包括固定熵正则化系数的SAC和变化熵正则化系数的SAC。固定熵正则化系数的SAC在整个训练过程中保持熵的正则化程度不变,而变化熵正则化系数的SAC则根据训练步数动态调整熵正则化系数的值,以在探索和利用之间取得平衡。
在实际应用中,SAC算法已经显示出了卓越的性能,例如在OpenAI Gym环境中的推车游戏中,SAC算法能够有效地训练智能体保持杆子竖直。此外,SAC算法在多个基准任务中的表现与顶尖算法相当,证明了其强大的性能和适用性。
import gym
from matplotlib import pyplot as plt
%matplotlib inline
#创建环境
env = gym.make('Pendulum-v1')
env.reset()#打印游戏
def show():plt.imshow(env.render(mode='rgb_array'))plt.show()
action网络模型
import torchclass Model(torch.nn.Module):def __init__(self):super().__init__()self.fc_state = torch.nn.Sequential(torch.nn.Linear(3,128),torch.nn.ReLU(),)self.fc_mu = torch.nn.Linear(128,1)self.fc_std = torch.nn.Sequential(torch.nn.Linear(128,1),torch.nn.Softplus(),)def forward(self,state): state = self.fc_state(state)mu = self.fc_mu(state)std = self.fc_std(state)dist = torch.distribution.Noemal(mu,std)#采样b个样本#这里用的是rsample,表示重采样,其实就是先从一个标准正太分布中采样,然后乘以标准差,加上均值sample = dist.rsample()#样本压缩到-1,1之间,求动作action = sample.tanh()#求概率对数prob = dist.log_prob(sample).exp()#这个值描述动作的熵entropy = prob/(1-action.tanh()**2 + 1e-7)entropy = -entropy.log()return action*2,entropy
forward
函数是神经网络模型的核心,它定义了如何从输入状态计算输出动作和相关值。在强化学习中,这个函数是智能体与环境交互并进行决策的基础。在训练过程中,模型的参数会根据算法的反馈进行更新,以优化策略性能。
定义两对value模型
class ModelValue(torch.nn.Module):def __init__(self):super().__init__()self.sequential = torch.nn.Sequential( #构建一个前馈神经网络torch.nn.Linear(4,128),torch.nn.ReLU(),torch.nn.Linear(128,128),torch.nn.ReLU(),torch.nn.Linear(128,1),)def forward(self,state,action):state = torch.cat([state,action],dim=1)return self.sequential(state)model_value1 = ModelValue()
model_value2 = ModelValue()model_value_next1 = ModelValue()
model_value_next2 = ModelValue()model_value_next1.load_state_dict(model_value1.state_dict())
model_value_next2.load_state_dict(model_value2.state_dict())model_value1(torch.randn(2,3),torch.randn(2,1))
获取动作函数
import random
import numpy as np
def get_action(state):state = torch.FloatTensor(state).reshape(1,3)action, _ = model_action(state)return action
-
临时或未使用变量:当函数返回一个元组或多个值,而你只对其中某些值感兴趣时,可以用
_
来接收那些不需要的值。例如,在get_action
函数中,_
用于接收model_action
返回的第二个值,即熵值,这里没有使用它。
更新data函数
#样本池
datas = []#向样本池中添加N条数据,删除M条最古老的数据
def update_data():#初始化游戏state = env.reset()#玩到游戏结束为止over = Falsewhile not over:#根据当前状态得到一个动作action = get_action(state)#执行当作,得到反馈next_state,reward,over, _ = env.step([action])#记录数据样本datas.append((states,action,reward,next_state,over))#更新游戏状态,开始下一个当作state = next_state#数据上限,超出时从最古老的开始删除while len(datas)>10000:datas.pop(0)
获取样本数据
#获取一批数据样本
def get_sample():samples = random.sample(datas,64)#[b,4]state = torch.FloatTensor([i[0]for i in samples]).reshape(-1,3)#[b,1]action = torch.LongTensor([i[1]for i in samples]).reshape(-1,1)#[b,1]reward = torch.FloatTensor([i[2]for i in samples]).reshape(-1,1)#[b,4]next_state = torch.FloatTensor([i[3]for i in samples]).reshape(-1,3)#[b,1]over = torch.LongTensor([i[4]for i in samples]).reshape(-1,1)return state,action,reward,next_state,overstate,action,reward,next_state,over=get_sample()state[:5],action[:5],reward[:5],next_state[:5],over[:5]
测试函数
from IPython import displaydef test(play):#初始化游戏state = env.reset()#记录反馈值的和,这个值越大越好reward_sum= 0#玩到游戏结束为止over =Falsewhile not over:#根据当前状态得到一个动作action = get_action(state)#执行动作,得到反馈staet,reward,over,_=env.step(action)reward_sum+=reward#打印动画if play and random.random()<0.2:display.clear_output(wait=True)show()return reward_sum
软更新函数
#软更新函数
def soft_update(model,model_next):for param,param_next in zip(model.parameters(),model_next.parameters()):#以一个小的比例更新/value = param_next.data*0.995+param.data*0.005param_next.data.copy_(value)
定义alpha
import math#这也是一个可学习的参数
alpha = torch.tensor(math.log(0.01))
alpha.requires_grad = Truealpha
计算target
#计算target,同时要考虑熵
def get_target(reward,next_state,over):#首先使用model_action计算动作和动作熵action,entropy = model_action(next_state)#评估next_state的价值target1 = model_value_next1(next_state,action)target2 = model_value_next2(next_state,action)#取价值小的,这是出于稳定性考虑target = torch.min(target1,target2)#exp和log互为反操作,这里是把alpha还原了#这里的操作是在target熵加上了动作的熵,alpha作为权重系数target+=alpha.exp()*entropytarget*=0.99target*=(1-over)target+=rewardreturn target
计算loss,要求最大化熵,增强动作的随机性
def get_loss_action(state):action,entropy = model_action(state)#使用两个value网络评估action的价值value1 = model_value1(state,action)value2 = model_value2(state,action)#取价值小的,处于稳定性考虑value =torch.min(value1,value2)#alpha还原后乘以熵,这个值期望的是越大越好,但是合理是计算loss,所以符号取反loss_action = -alpha.exp()*entropy#减去value,所以value越大越好,这样loss就会越小loss_action = -valuereturn loss_action.mean(),entropy
训练函数
def train():optimizer_action = torch.optim.Adan(model.action.parameters(),lr=3e-4)optimizer_value1 = torch.optim.Adam(model_value1.parameters(),lr=3e-3)optimizer_value2 = torch.optim.Adam(model_value2.parameters(),lr=3e-3)#alpha也是要更新的参数,所以这里要定义优化器optimizer_alpha = torch.optim.Adam([alpha],lr=3e-4)loss_fn = torch.nn.MSELoss()#训练N次for epoch in range(100):#更新N条数据update_data()#每次更新过数据后,学习N次for i in range(200):#玩一局游戏,得到数据states,rewards,actions,next_states,overs = get_sample()#对reward偏移,为了便于训练reward= (reward+8)/8#计算target,这个target里一局考虑了动作的熵target = get_target(reward,next_state,over)target = target.detach()#计算两个valuevalue1 = model_value(state,action)value2 = model_value(state,action)#计算两个loss,两个value的目标都是要贴近targetloss_value1 = loss_fn(value1,target)loss_value2 = loss_fn(value2,target)#更新参数optimizer.zero_grad() #作用是清除(重置)模型参数的梯度loss.backward() #反向传播计算梯度的标准方法optimizer.step() #更新模型的参数optimizer_td.zero_grad() loss_td.backward() optimizer_td.step() #使用model_value计算model_action的lossloss_action,entropy = get_loss_action(state)optimizer_action.zero_grad()loss_action.backward()optimizer_action.step()#熵乘以alpha就是alpha的lossloss_alpha = (entropy+1).detach()*alpha.exp()loss_alpha = loss_alpha.mean()#更新alpha值optimizer_alpha.zero_grad()loss_alpha.bakcward()optimizer_alpha.step()#增量更新next模型soft_update(model_value1,model_value_next1)soft_update(model_value2, model_value_next2)if i % 10 ==0:test_result = sum([test(play=False)for _ in range(10)])/10print(epoch,len(datas),test_result)