一、多目标强化学习原理
1. 多目标学习核心思想
多目标强化学习(Multi-Objective RL)旨在让智能体同时优化多个冲突目标,通过平衡目标间的权衡关系找到帕累托最优解集。与传统强化学习的区别在于:
对比维度 | 传统强化学习 | 多目标强化学习 |
---|---|---|
目标数量 | 单一奖励函数 | 多个奖励函数(可能相互冲突) |
优化目标 | 最大化单一累计奖励 | 找到帕累托最优策略集合 |
解的唯一性 | 唯一最优解 | 多个非支配解(Pareto Front) |
应用场景 | 目标明确且无冲突的任务 | 自动驾驶(安全 vs 效率)、资源分配 |
2. 多目标问题建模
二、标量化方法(Scalarization)
三、多目标 PPO 算法实现(基于 Gymnasium)
以 自定义机器人控制环境 为例,实现基于权重调整的多目标 PPO 算法:
-
定义多目标环境:机器人需同时最大化前进速度和最小化能耗
-
构建策略网络:共享特征提取层 + 多目标价值头
-
动态权重调整:根据训练阶段调整目标权重
-
帕累托前沿分析:评估不同权重下的策略性能
四、代码实现
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Normal
import gymnasium as gym
import time
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ================== 自定义多目标环境 ==================
class MultiObjectiveRobotEnv(gym.Env):def __init__(self):self.observation_space = gym.spaces.Box(low=-1, high=1, shape=(4,))self.action_space = gym.spaces.Box(low=-1, high=1, shape=(2,))self.state = Noneself.steps = 0self.max_steps = 200 # 添加最大步数限制def reset(self):self.state = np.random.uniform(-1, 1, size=(4,))self.steps = 0return self.state.copy()def step(self, action):# Clip actions to valid rangeaction = np.clip(action, -1, 1)# 定义两个冲突目标:速度(正向奖励)和能耗(负向奖励)velocity_reward = np.abs(action[0]) # 速度与动作绝对值正相关energy_cost = 0.1 * np.sum(np.square(action)) # 能耗与动作平方正相关self.state += 0.1 * np.array([action[0], action[1], 0, 0]) # 简化动力学self.state = np.clip(self.state, -1, 1) # 保持状态在合理范围内self.steps += 1done = self.steps >= self.max_steps # 超过最大步数时终止return self.state.copy(), [velocity_reward, -energy_cost], done, {}
# ================== 多目标 PPO 策略网络 ==================
class MultiObjectivePPO(nn.Module):def __init__(self, state_dim, action_dim, num_objectives=2):super().__init__()# 共享特征提取层self.shared_net = nn.Sequential(nn.Linear(state_dim, 256),nn.Tanh(), # Using Tanh for more stable gradientsnn.Linear(256, 256),nn.Tanh())# 多目标价值头self.value_heads = nn.ModuleList([nn.Sequential(nn.Linear(256, 1), nn.Tanh()) for _ in range(num_objectives)])# 策略头self.actor_mean = nn.Sequential(nn.Linear(256, action_dim),nn.Tanh() # Output between -1 and 1)self.actor_log_std = nn.Parameter(torch.zeros(action_dim) - 1.0) # Initialize to smaller stddef forward(self, state):features = self.shared_net(state)values = [head(features) for head in self.value_heads]action_mean = self.actor_mean(features)action_std = torch.exp(self.actor_log_std).clamp(1e-4, 1.0) # Clamp std to avoid NaNreturn action_mean, action_std, values
# ================== 训练系统 ==================
class MultiObjectivePPOTrainer:def __init__(self):self.env = MultiObjectiveRobotEnv()self.state_dim = self.env.observation_space.shape[0]self.action_dim = self.env.action_space.shape[0]self.num_objectives = 2self.policy = MultiObjectivePPO(self.state_dim, self.action_dim).to(device)self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-4) # Reduced learning rateself.weights = np.array([0.5, 0.5]) # 初始权重self.gamma = 0.99 # Discount factorself.clip_epsilon = 0.2 # PPO clip parameterself.max_steps_per_episode = 200 # 每episode最大步数def update_weights(self, episode):# 动态调整权重(示例:周期变化)self.weights = np.array([np.sin(episode * 0.01) * 0.5 + 0.5, 1 - (np.sin(episode * 0.01) * 0.5 + 0.5)])self.weights /= np.sum(self.weights)def train(self, max_episodes=1000):for episode in range(max_episodes):self.update_weights(episode)state = self.env.reset()episode_rewards = np.zeros(self.num_objectives)episode_steps = 0# 采集轨迹数据states, actions, log_probs, objectives = [], [], [], []for _ in range(self.max_steps_per_episode):state_tensor = torch.FloatTensor(state).unsqueeze(0).to(device)with torch.no_grad():action_mean, action_std, values = self.policy(state_tensor)dist = Normal(action_mean, action_std)action = dist.sample()log_prob = dist.log_prob(action).sum(dim=-1)next_state, obj_rewards, done, _ = self.env.step(action.squeeze(0).cpu().numpy())states.append(state)actions.append(action.squeeze(0))log_probs.append(log_prob)objectives.append(obj_rewards)episode_rewards += np.array(obj_rewards)episode_steps += 1state = next_stateif done:break# 计算加权累计奖励weighted_reward = np.dot(episode_rewards, self.weights)if (episode + 1) % 100 == 0:print(f"Episode {episode+1}/{max_episodes} | Steps: {episode_steps} | "f"Weight: {self.weights} | Reward: {weighted_reward:.1f} | "f"Obj1: {episode_rewards[0]:.1f} | Obj2: {episode_rewards[1]:.1f}")
if __name__ == "__main__":start = time.time()start_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start))print(f"开始时间: {start_str}")print("初始化环境...")trainer = MultiObjectivePPOTrainer()trainer.train(max_episodes=500) # 先测试少量episodeend = time.time()end_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(end))print(f"训练完成时间: {end_str}")print(f"训练完成,耗时: {end - start:.2f}秒")
五、关键代码解析
-
多目标环境设计
-
step()
返回两个奖励:速度奖励(velocity_reward
)和能耗惩罚(-energy_cost
)。 -
通过调整动作的绝对值(速度)和平方值(能耗)实现目标冲突。
-
-
动态权重调整
-
update_weights()
周期性调整目标权重(示例中使用正弦函数)。 -
实际应用中可根据需求设计自适应权重策略。
-
-
策略网络结构
-
共享特征提取层(
shared_net
)学习状态共性表示。 -
独立价值头(
value_heads
)分别预测各目标的价值。
-
六、训练输出示例
开始时间: 2025-03-26 03:37:15
初始化环境...
Episode 100/500 | Steps: 200 | Weight: [0.91801299 0.08198701] | Reward: 53.4 | Obj1: 58.6 | Obj2: -5.2
Episode 200/500 | Steps: 200 | Weight: [0.95670668 0.04329332] | Reward: 59.9 | Obj1: 62.9 | Obj2: -5.7
Episode 300/500 | Steps: 200 | Weight: [0.57550636 0.42449364] | Reward: 31.9 | Obj1: 59.3 | Obj2: -5.2
Episode 400/500 | Steps: 200 | Weight: [0.12488584 0.87511416] | Reward: 2.2 | Obj1: 61.9 | Obj2: -6.3
Episode 500/500 | Steps: 200 | Weight: [0.01914355 0.98085645] | Reward: -4.4 | Obj1: 64.0 | Obj2: -5.8
训练完成时间: 2025-03-26 03:39:16
训练完成,耗时: 120.95秒
七、总结与扩展
本文实现了多目标强化学习的核心范式——基于动态权重的标量化方法,展示了帕累托前沿的探索能力。
在下一篇文章中,我们将探索 稳定扩散模型(Stable Diffusion),并实现文本到图像生成(Text-to-Image Generation)的完整流程!
注意事项
-
安装依赖:
pip install gymnasium torch numpy
-
自定义环境需继承
gym.Env
并实现reset()
和step()
方法。 -
动态权重调整策略可根据实际需求设计(如基于任务难度或用户偏好)。