目录
1.数据处理
1.1.数据集下载
1.2.数据加载
1.2.1读取数据
1.2.2词表转换
1.2.3封装数据
2. 模型构建
2.1汇聚层算子
2.2模型汇总
3. 模型训练
3.1模型训练
3.2绘制准确率和损失函数图像
4. 模型评价
5. 模型预测
6. 基于Torch的单向LSTM
6.1模型修改-只返回最后时刻的隐状态
6.1.1模型训练
6.1.2模型评价
6.2模型修改-所有时刻的隐状态向量
6.2.1模型训练
6.2.2模型评价
7.基于双向LSTM和注意力机制的文本分类
7.1注意力机制
7.2模型构建
7.2.1注意力打分函数
7.2.2点积注意力打分算子
7.2.3注意力分布计算
7.2.4加权平均
7.2.5模型汇总
7.3加性注意力模型
7.3.1模型训练
7.3.2模型评价
7.4点积注意力模型
7.4.1模型训练
7.4.2模型评价
7.5注意力可视化
1.数据处理
电影评论可以蕴含丰富的情感:比如喜欢、讨厌、等等.情感分析(Sentiment Analysis)是为一个文本分类问题,即使用判定给定的一段文本信息表达的情感属于积极情绪,还是消极情绪.
本实践使用 IMDB 电影评论数据集,使用双向 LSTM 对电影评论进行情感分析.
1.1.数据集下载
Bag of Words Meets Bags of Popcorn | Kaggle
实际上这个数据集与实验书中使用的数据集并不相同,具体区别在于测试集数据并不带标签,这里建议去飞桨社区下载。没有账号的注册一个。
选择在项目页面右上角点击启动环境。
打开项目到notebook页面,这个点击dataset,就可以下载相应的数据集和词典了。
1.2.数据加载
# 加载数据集
def load_imdb_data(path):assert os.path.exists(path), f"路径 {path} 不存在!"trainset, devset, testset = [], [], []# 读取train.txt文件with open(os.path.join(path, "train.txt"), "r", encoding="utf-8") as fr:for line in fr:try:sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)trainset.append((sentence, sentence_label))except ValueError:print(f"跳过无效行: {line.strip()}")# 读取dev.txt文件with open(os.path.join(path, "dev.txt"), "r", encoding="utf-8") as fr:for line in fr:try:sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)devset.append((sentence, sentence_label))except ValueError:print(f"跳过无效行: {line.strip()}")# 读取test.txt文件with open(os.path.join(path, "test.txt"), "r", encoding="utf-8") as fr:for line in fr:try:sentence_label, sentence = line.strip().lower().split("\t", maxsplit=1)testset.append((sentence, sentence_label))except ValueError:print(f"跳过无效行: {line.strip()}")return trainset, devset, testset# 加载IMDB数据集
train_data, dev_data, test_data = load_imdb_data("./dataset/")# # 打印一下加载后的数据样式
# print(train_data[4]) # 打印第5个数据点,确保数据被正确加载
1.2.1读取数据
class IMDBDataset(Dataset):def __init__(self, examples, word2id_dict):super(IMDBDataset, self).__init__()# 词典,用于将单词转为字典索引的数字self.word2id_dict = word2id_dict# 加载后的数据集self.examples = self.words_to_id(examples)def words_to_id(self, examples):tmp_examples = []for idx, example in enumerate(examples):seq, label = example# 将单词映射为字典索引的ID, 对于词典中没有的单词用[UNK]对应的ID进行替代seq = [self.word2id_dict.get(word, self.word2id_dict['[UNK]']) for word in seq.split(" ")]label = int(label)tmp_examples.append([seq, label])return tmp_examplesdef __getitem__(self, idx):seq, label = self.examples[idx]return seq, labeldef __len__(self):return len(self.examples)
1.2.2词表转换
def load_vocab(vocab_path):word2id = {}idx=0with open(vocab_path, 'r', encoding='utf-8') as f:for line in f.readlines():word = line.strip()word2id[word] = idx # 每个词按顺序分配索引idx += 1return word2idword2id_dict = load_vocab("./dataset/vocab.txt")# 实例化Dataset
train_set = IMDBDataset(train_data, word2id_dict)
dev_set = IMDBDataset(dev_data, word2id_dict)
test_set = IMDBDataset(test_data, word2id_dict)# print('训练集样本数:', len(train_set))
# print('样本示例:', train_set[4])
1.2.3封装数据
在构建 Dataset 类之后,我们构造对应的 DataLoader,用于批次数据的迭代.和前几章的 DataLoader 不同,这里的 DataLoader 需要引入下面两个功能:
- 长度限制:需要将序列的长度控制在一定的范围内,避免部分数据过长影响整体训练效果。
- 长度补齐:神经网络模型通常需要同一批处理的数据的序列长度是相同的,然而在分批时通常会将不同长度序列放在同一批,因此需要对序列进行补齐处理。
def collate_fn(batch_data, pad_val=0, max_seq_len=256):seqs, seq_lens, labels = [], [], []max_len = 0for example in batch_data:seq, label = example# 对数据序列进行截断seq = seq[:max_seq_len]# 对数据截断并保存于seqs中seqs.append(seq)seq_lens.append(len(seq))labels.append(label)# 保存序列最大长度max_len = max(max_len, len(seq))# 对数据序列进行填充至最大长度for i in range(len(seqs)):seqs[i] = seqs[i] + [pad_val] * (max_len - len(seqs[i]))# 返回Tensor形式的数据return (torch.tensor(seqs), torch.tensor(seq_lens)), torch.tensor(labels)# # 测试coolate_fn # # max_seq_len = 5 # batch_data = [[[1, 2, 3, 4, 5, 6], 1], [[2,4,6], 0]] # (seqs, seq_lens), labels = collate_fn(batch_data, pad_val=word2id_dict["[PAD]"], max_seq_len=max_seq_len) # print("seqs: ", seqs) # print("seq_lens: ", seq_lens) # print("labels: ", labels)max_seq_len = 256 batch_size = 128# 使用partial为collate_fn提供固定参数 collate_fn = partial(collate_fn, pad_val=word2id_dict["[PAD]"], max_seq_len=max_seq_len)# 创建 PyTorch DataLoader train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, drop_last=False, collate_fn=collate_fn) dev_loader = DataLoader(dev_set, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn) test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False, drop_last=False, collate_fn=collate_fn)
可以看到,原始序列中长度为6的序列被截断为5,同时原始序列中长度为3的序列被填充到5,同时返回了非[PAD]
的序列长度。
接下来,我们将collate_fn作为回调函数传入DataLoader中, 其在返回一批数据时,可以通过collate_fn函数处理该批次的数据。 这里需要注意的是,这里通过partial函数对collate_fn函数中的关键词参数进行设置,并返回一个新的函数对象作为collate_fn。
由于这里返回的X是一个tuple类型,因此如果想把该数据放上GPU加速的话,应该要再多一步转换,这里为了不大修RunnerV3类,选择在Model上将X中的数据放上GPU。具体的GPU加速的代码放到了2.2模型汇总中。
2. 模型构建
整个模型结构如图
2.1汇聚层算子
汇聚层算子将双向LSTM层所有位置上的隐状态进行平均,作为整个句子的表示。这里我们实现了AveragePooling算子进行隐状态的汇聚,首先利用序列长度向量生成掩码(Mask)矩阵,用于对文本序列中[PAD]位置的向量进行掩蔽,然后将该序列的向量进行相加后取均值。
class AveragePooling(nn.Module):def __init__(self):super(AveragePooling, self).__init__()def forward(self, sequence_output, sequence_length):sequence_length = sequence_length.unsqueeze(-1).float()max_len = sequence_output.shape[1]mask = torch.arange(max_len, device=sequence_output.device) < sequence_lengthmask = mask.float().unsqueeze(-1)sequence_output = sequence_output * maskbatch_mean_hidden = torch.sum(sequence_output, dim=1) / sequence_lengthreturn batch_mean_hidden
2.2模型汇总
加入GPU加速
class Model_BiLSTM_FC(nn.Module):def __init__(self, num_embeddings, input_size, hidden_size, num_classes=2):super(Model_BiLSTM_FC, self).__init__()self.num_embeddings = num_embeddingsself.input_size = input_sizeself.hidden_size = hidden_sizeself.num_classes = num_classes# 词嵌入层self.embedding_layer = nn.Embedding(num_embeddings, input_size, padding_idx=0)# 双向LSTM层self.lstm_layer = nn.LSTM(input_size, hidden_size, bidirectional=True)# 聚合层self.average_layer = AveragePooling()# 输出层self.output_layer = nn.Linear(hidden_size * 2, num_classes)def forward(self, inputs):input_ids, sequence_length = inputs# 将数据迁移到当前模型所在的设备device = next(self.parameters()).deviceinput_ids = input_ids.to(device)sequence_length = sequence_length.to(device)# 获取词向量inputs_emb = self.embedding_layer(input_ids)# 使用LSTM处理数据sequence_output, _ = self.lstm_layer(inputs_emb)# 使用聚合层对LSTM输出进行聚合batch_mean_hidden = self.average_layer(sequence_output, sequence_length)# 输出层进行分类logits = self.output_layer(batch_mean_hidden)return logits
3. 模型训练
3.1模型训练
# 设置随机种子
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)# 设置训练参数
num_epochs = 3
learning_rate = 0.001
num_embeddings = len(word2id_dict) # 假设word2id_dict是词汇表字典
input_size = 256 # embedding维度
hidden_size = 256 # LSTM隐层维度# 实例化模型
model = Model_BiLSTM_FC(num_embeddings, input_size, hidden_size)# 指定优化器
optimizer = optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999))# 指定损失函数
loss_fn = nn.CrossEntropyLoss()# 指定评估指标
metric = Accuracy()use_gpu = torch.cuda.is_available() # 检查是否有可用的GPU
# print(use_gpu)
if use_gpu:device = torch.device('cuda:0') # 指定 GPU 0
else:device = torch.device('cpu') # 使用 CPU
# print(device)
if use_gpu:model = model.to(device) # 将模型转移到 GPU# 实例化Runner
runner = RunnerV3(model, optimizer, loss_fn, metric,device)
# 模型训练
start_time = time.time()
runner.train(train_loader, dev_loader, num_epochs=num_epochs, eval_steps=10, log_steps=10, save_path="./checkpoints/best.pdparams")
end_time = time.time()
print("time: ", (end_time-start_time))
3.2绘制准确率和损失函数图像
# 损失准确率图像
# 图像名字
fig_name = "./images/6.16.pdf"
# sample_step: 训练损失的采样step,即每隔多少个点选择1个点绘制
# loss_legend_loc: loss 图像的图例放置位置
# acc_legend_loc: acc 图像的图例放置位置
plot_training_loss_acc(runner, fig_name, fig_size=(16,6), sample_step=10, loss_legend_loc="lower left", acc_legend_loc="lower right")
下图展示了文本分类模型在训练过程中的损失曲线和在验证集上的准确率曲线,其中在损失图像中,实线表示训练集上的损失变化,虚线表示验证集上的损失变化. 可以看到,随着训练过程的进行,训练集的损失不断下降, 验证集上的损失在大概200步后开始上升,这是因为在训练过程中发生了过拟合,可以选择保存在训练过程中在验证集上效果最好的模型来解决这个问题. 从准确率曲线上可以看到,首先在验证集上的准确率大幅度上升,然后大概200步后准确率不再上升,并且由于过拟合的因素,在验证集上的准确率稍微降低。
4. 模型评价
# 模型评价
model_path = "./checkpoints/best.pdparams"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")
5. 模型预测
# 模型预测
# id2label 映射
id2label = {0: "消极情绪", 1: "积极情绪"}# 输入文本
text = "this movie is so great. I watched it three times already"# 处理单条文本
sentence = text.split(" ")
words = [word2id_dict[word] if word in word2id_dict else word2id_dict['[UNK]'] for word in sentence]
words = words[:max_seq_len]
sequence_length = torch.tensor([len(words)], dtype=torch.long) # 记录序列长度
words = torch.tensor(words, dtype=torch.long).unsqueeze(0) # 增加batch维度
logits = runner.predict((words, sequence_length))
# 获取最大标签索引
max_label_id = torch.argmax(logits, dim=-1).item() # 获取预测的标签索引# 显示预测标签
pred_label = id2label[max_label_id]
print("Label: ", pred_label)
6. 基于Torch的单向LSTM
首先,修改模型定义,将nn.LSTM
中的direction
设置为forward
以使用单向LSTM模型,同时设置线性层的shape为[hidden_size, num_classes]
。
6.1模型修改-只返回最后时刻的隐状态
class AveragePooling(nn.Module):def __init__(self):super(AveragePooling, self).__init__()def forward(self, sequence_output, sequence_length):# 对sequence_length进行扩展,变成(batch_size, 1)sequence_length = sequence_length.unsqueeze(-1).float()max_len = sequence_output.size(1)# 根据sequence_length生成mask矩阵mask = torch.arange(max_len, device=sequence_output.device).unsqueeze(0) < sequence_lengthmask = mask.float().unsqueeze(-1)# 对padding位置进行mask处理sequence_output = sequence_output * mask# 对序列中的向量取均值batch_mean_hidden = sequence_output.sum(dim=1) / sequence_lengthreturn batch_mean_hiddenclass Model_BiLSTM_FC(nn.Module):def __init__(self, num_embeddings, input_size, hidden_size, num_classes=2):super(Model_BiLSTM_FC, self).__init__()# 词典大小self.num_embeddings = num_embeddings# 单词向量的维度self.input_size = input_size# LSTM隐藏单元数量self.hidden_size = hidden_size# 情感分类类别数量self.num_classes = num_classes# 实例化嵌入层self.embedding_layer = nn.Embedding(num_embeddings, input_size, padding_idx=0)# 实例化LSTM层self.lstm_layer = nn.LSTM(input_size, hidden_size, batch_first=True, bidirectional=False) # 单向LSTM# 实例化聚合层self.average_layer = AveragePooling()# 实例化输出层self.output_layer = nn.Linear(hidden_size, num_classes)def forward(self, inputs):# 对模型输入拆分为序列数据和maskinput_ids, sequence_length = inputs# 将数据迁移到当前模型所在的设备device = next(self.parameters()).deviceinput_ids = input_ids.to(device)sequence_length = sequence_length.to(device)# 获取词向量inputs_emb = self.embedding_layer(input_ids)# 使用LSTM处理数据sequence_output, _ = self.lstm_layer(inputs_emb) # 不需要显式传递sequence_length# 使用聚合层聚合sequence_outputbatch_mean_hidden = self.average_layer(sequence_output, sequence_length)# 输出文本分类logitslogits = self.output_layer(batch_mean_hidden)return logits
6.1.1模型训练
# 设置随机种子
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)# 设置训练参数
num_epochs = 3
learning_rate = 0.001
num_embeddings = len(word2id_dict) # 假设word2id_dict是词汇表字典
input_size = 256 # embedding维度
hidden_size = 256 # LSTM隐层维度# 实例化模型
model = Model_BiLSTM_FC(num_embeddings, input_size, hidden_size)# 指定优化器
optimizer = optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999))# 指定损失函数
loss_fn = nn.CrossEntropyLoss()# 指定评估指标
metric = Accuracy()use_gpu = torch.cuda.is_available() # 检查是否有可用的GPU
# print(use_gpu)
if use_gpu:device = torch.device('cuda:0') # 指定 GPU 0
else:device = torch.device('cpu') # 使用 CPU
# print(device)
if use_gpu:model = model.to(device) # 将模型转移到 GPU# 实例化Runner
runner = RunnerV3(model, optimizer, loss_fn, metric,device)
# 模型训练
start_time = time.time()
runner.train(train_loader, dev_loader, num_epochs=num_epochs, eval_steps=10, log_steps=10, save_path="./checkpoints/best_forward.pdparams")
end_time = time.time()
print("time: ", (end_time-start_time))
6.1.2模型评价
# 模型评价
model_path = "./checkpoints/best_forward.pdparams"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")
6.2模型修改-所有时刻的隐状态向量
由于之前实现的LSTM默认只返回最后时刻的隐状态,然而本实验中需要用到所有时刻的隐状态向量,因此需要对自己实现的LSTM进行修改,使其返回序列向量。
class AveragePooling(nn.Module):def __init__(self):super(AveragePooling, self).__init__()def forward(self, sequence_output, sequence_length):sequence_length = sequence_length.unsqueeze(-1).float()max_len = sequence_output.shape[1]mask = torch.arange(max_len, device=sequence_output.device) < sequence_lengthmask = mask.float().unsqueeze(-1)sequence_output = sequence_output * maskbatch_mean_hidden = torch.sum(sequence_output, dim=1) / sequence_lengthreturn batch_mean_hidden
class LSTM(nn.Module):def __init__(self, input_size, hidden_size):super(LSTM, self).__init__()self.input_size = input_sizeself.hidden_size = hidden_size# 初始化模型参数self.W_i = nn.Parameter(torch.empty(input_size, hidden_size))self.W_f = nn.Parameter(torch.empty(input_size, hidden_size))self.W_o = nn.Parameter(torch.empty(input_size, hidden_size))self.W_c = nn.Parameter(torch.empty(input_size, hidden_size))self.U_i = nn.Parameter(torch.empty(hidden_size, hidden_size))self.U_f = nn.Parameter(torch.empty(hidden_size, hidden_size))self.U_o = nn.Parameter(torch.empty(hidden_size, hidden_size))self.U_c = nn.Parameter(torch.empty(hidden_size, hidden_size))self.b_i = nn.Parameter(torch.empty(1, hidden_size))self.b_f = nn.Parameter(torch.empty(1, hidden_size))self.b_o = nn.Parameter(torch.empty(1, hidden_size))self.b_c = nn.Parameter(torch.empty(1, hidden_size))# Xavier均匀分布初始化init.xavier_uniform_(self.W_i)init.xavier_uniform_(self.W_f)init.xavier_uniform_(self.W_o)init.xavier_uniform_(self.W_c)init.xavier_uniform_(self.U_i)init.xavier_uniform_(self.U_f)init.xavier_uniform_(self.U_o)init.xavier_uniform_(self.U_c)init.zeros_(self.b_i)init.zeros_(self.b_f)init.zeros_(self.b_o)init.zeros_(self.b_c)def init_state(self, batch_size, device):hidden_state = torch.zeros(batch_size, self.hidden_size, dtype=torch.float32, device=device)cell_state = torch.zeros(batch_size, self.hidden_size, dtype=torch.float32, device=device)return hidden_state, cell_statedef forward(self, inputs, states=None, sequence_length=None):batch_size, seq_len, input_size = inputs.shape # inputs: batch_size x seq_len x input_size# 获取输入张量的设备device = inputs.deviceif states is None:states = self.init_state(batch_size, device)hidden_state, cell_state = statesoutputs = []# 执行LSTM计算,包括:隐藏门、输入门、遗忘门、候选状态向量、状态向量和隐状态向量for step in range(seq_len):input_step = inputs[:, step, :]# 确保所有参数都在相同设备上I_gate = torch.sigmoid(torch.matmul(input_step, self.W_i) + torch.matmul(hidden_state, self.U_i) + self.b_i)F_gate = torch.sigmoid(torch.matmul(input_step, self.W_f) + torch.matmul(hidden_state, self.U_f) + self.b_f)O_gate = torch.sigmoid(torch.matmul(input_step, self.W_o) + torch.matmul(hidden_state, self.U_o) + self.b_o)C_tilde = torch.tanh(torch.matmul(input_step, self.W_c) + torch.matmul(hidden_state, self.U_c) + self.b_c)cell_state = F_gate * cell_state + I_gate * C_tildehidden_state = O_gate * torch.tanh(cell_state)outputs.append(hidden_state.unsqueeze(dim=1))outputs = torch.cat(outputs, dim=1) # (batch_size, seq_len, hidden_size)return outputsclass Model_BiLSTM_FC(nn.Module):def __init__(self, num_embeddings, input_size, hidden_size, num_classes=2):super(Model_BiLSTM_FC, self).__init__()# 词典大小self.num_embeddings = num_embeddings# 单词向量的维度self.input_size = input_size# LSTM隐藏单元数量self.hidden_size = hidden_size# 情感分类类别数量self.num_classes = num_classes# 实例化嵌入层self.embedding_layer = nn.Embedding(num_embeddings, input_size, padding_idx=0)# 实例化LSTM层self.lstm_layer = LSTM(input_size, hidden_size)# 实例化聚合层self.average_layer = AveragePooling()# 实例化输出层self.output_layer = nn.Linear(hidden_size, num_classes)def forward(self, inputs):# 对模型输入拆分为序列数据和maskinput_ids, sequence_length = inputs# 将数据迁移到当前模型所在的设备device = next(self.parameters()).deviceinput_ids = input_ids.to(device)sequence_length = sequence_length.to(device)# 获取词向量inputs_emb = self.embedding_layer(input_ids)# 使用lstm处理数据sequence_output = self.lstm_layer(inputs_emb)# 使用聚合层聚合sequence_outputbatch_mean_hidden = self.average_layer(sequence_output, sequence_length)# 输出文本分类logitslogits = self.output_layer(batch_mean_hidden)return logits
6.2.1模型训练
# 设置随机种子
torch.manual_seed(0)
np.random.seed(0)
random.seed(0)# 设置训练参数
num_epochs = 3
learning_rate = 0.001
num_embeddings = len(word2id_dict) # 假设word2id_dict是词汇表字典
input_size = 256 # embedding维度
hidden_size = 256 # LSTM隐层维度# 实例化模型
model = Model_BiLSTM_FC(num_embeddings, input_size, hidden_size)# 指定优化器
optimizer = optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999))# 指定损失函数
loss_fn = nn.CrossEntropyLoss()# 指定评估指标
metric = Accuracy()use_gpu = torch.cuda.is_available() # 检查是否有可用的GPU
# print(use_gpu)
if use_gpu:device = torch.device('cuda:0') # 指定 GPU 0
else:device = torch.device('cpu') # 使用 CPU
# print(device)
if use_gpu:model = model.to(device) # 将模型转移到 GPU# 实例化Runner
runner = RunnerV3(model, optimizer, loss_fn, metric,device)
# 模型训练
start_time = time.time()
runner.train(train_loader, dev_loader, num_epochs=num_epochs, eval_steps=10, log_steps=10, save_path="./checkpoints/best_self_forward.pdparams")
end_time = time.time()
print("time: ", (end_time-start_time))
6.2.2模型评价
# 模型评价
model_path = "./checkpoints/best_self_forward.pdparams"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")
7.基于双向LSTM和注意力机制的文本分类
7.1注意力机制
注意力机制的计算可以分为两步:一是在所有序列元素上计算注意力分布,二是根据注意力分布来计算序列中所有元素表示的加权平均得到的聚合表示。
给定一个和任务相关的查询向量q,首先计算注意力分布,即选择第n个输入向量的概率αn:
得到注意力分布之后,可以对输入向量进行加权平均,得到整个序列的最终表示。
7.2模型构建
本实验的模型结构如图所示。整个模型由以下几个部分组成:
1) 嵌入层:将输入句子中的词语转换为向量表示;
2) LSTM层:基于双向LSTM网络来建模句子中词语的上下文表示;
3) 注意力层:使用注意力机制来从LSTM层的输出中筛选和聚合有效的特征;
4) 线性层:输出层,预测对应的类别得分。
7.2.1注意力打分函数
class AdditiveScore(nn.Module):def __init__(self, hidden_size):super(AdditiveScore, self).__init__()self.fc_W = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_U = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_v = nn.Linear(hidden_size, 1, bias=False)# 查询向量使用均匀分布随机初始化self.q = nn.Parameter(torch.FloatTensor(1, hidden_size).uniform_(-0.5, 0.5))def forward(self, inputs):"""输入:- inputs:输入矩阵,shape=[batch_size, seq_len, hidden_size]输出:- scores:输出矩阵,shape=[batch_size, seq_len]"""# inputs: [batch_size, seq_len, hidden_size]batch_size, seq_len, hidden_size = inputs.shape# scores: [batch_size, seq_len, hidden_size]scores = torch.tanh(self.fc_W(inputs) + self.fc_U(self.q))# scores: [batch_size, seq_len]scores = self.fc_v(scores).squeeze(-1)return scores# # 测试代码
# torch.manual_seed(2021)
# inputs = torch.rand(1, 3, 3)
# additiveScore = AdditiveScore(hidden_size=3)
# scores = additiveScore(inputs)
# print(scores)
7.2.2点积注意力打分算子
class DotProductScore(nn.Module):def __init__(self, hidden_size):super(DotProductScore, self).__init__()# 使用均匀分布随机初始化一个查询向量self.q = nn.Parameter(torch.FloatTensor(hidden_size, 1).uniform_(-0.5, 0.5))def forward(self, inputs):"""输入:- X:输入矩阵,shape=[batch_size, seq_len, hidden_size]输出:- scores:输出矩阵,shape=[batch_size, seq_len]"""# inputs: [batch_size, seq_len, hidden_size]batch_size, seq_len, hidden_size = inputs.shape# scores: [batch_size, seq_len, 1]scores = torch.matmul(inputs, self.q)# scores: [batch_size, seq_len]scores = scores.squeeze(-1)return scores# # 测试代码
# torch.manual_seed(2021)
# inputs = torch.rand(1, 3, 3)
# dotScore = DotProductScore(hidden_size=3)
# scores = dotScore(inputs)
# print(scores)
对于输入序列为X,其中B为批量大小,L为序列长度,D为特征维度,以及可学习的任务相关的查询向量q,点积模型的公式为:
7.2.3注意力分布计算
\Alpha是归一化后的注意力分布,也称为注意力权重。
7.2.4加权平均
# Attention类
class Attention(nn.Module):def __init__(self, hidden_size, use_additive=False):super(Attention, self).__init__()self.use_additive = use_additive# 使用加性模型或者点积模型if self.use_additive:self.scores = AdditiveScore(hidden_size)else:self.scores = DotProductScore(hidden_size)self._attention_weights = Nonedef forward(self, X, valid_lens):"""输入:- X:输入矩阵,shape=[batch_size, seq_len, hidden_size]- valid_lens:长度矩阵,shape=[batch_size]输出:- context :输出矩阵,表示的是注意力的加权平均的结果"""# 确保所有张量都在同一设备上(GPU)device = X.device # 获取输入X的设备# scores: [batch_size, seq_len]scores = self.scores(X)# arrange: [1, seq_len], 比如 seq_len=4, arrange 变为 [0,1,2,3]arrange = torch.arange(scores.shape[1], dtype=torch.float32, device=device).unsqueeze(0)# valid_lens : [batch_size, 1]valid_lens = valid_lens.unsqueeze(1).to(device) # 将valid_lens移到相同设备# mask [batch_size, seq_len]mask = arrange < valid_lensy = torch.full(scores.shape, -1e9, dtype=scores.dtype, device=device) # 确保y在相同设备上scores = torch.where(mask, scores, y)# attn_weights: [batch_size, seq_len]attn_weights = F.softmax(scores, dim=-1)self._attention_weights = attn_weights# context: [batch_size, 1, hidden_size]context = torch.matmul(attn_weights.unsqueeze(1), X)# context: [batch_size, hidden_size]context = torch.squeeze(context, dim=1)return context@propertydef attention_weights(self):return self._attention_weights# # 测试代码
# torch.manual_seed(2021)
# X = torch.rand(1, 3, 3)
# valid_lens = torch.tensor([3])
# attention = Attention(hidden_size=3, use_additive=True)
# context = attention(X, valid_lens)
# print(context)
加权平均就是在使用打分函数计算注意力分布后,用该分布的每个值跟相应的输入的向量相乘得到的结果。
7.2.5模型汇总
# Model_LSTMAttention 类
class Model_LSTMAttention(nn.Module):def __init__(self,hidden_size,embedding_size,vocab_size,n_classes=10,n_layers=1,use_additive=False,):super(Model_LSTMAttention, self).__init__()self.hidden_size = hidden_sizeself.embedding_size = embedding_sizeself.vocab_size = vocab_sizeself.n_classes = n_classesself.n_layers = n_layers# 定义embedding层self.embedding = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embedding_size)# 定义LSTM层,使用双向LSTMself.lstm = nn.LSTM(input_size=self.embedding_size,hidden_size=self.hidden_size,num_layers=self.n_layers,bidirectional=True,batch_first=True)output_size = self.hidden_size * 2 # 双向LSTM的输出维度# 定义Attention层self.attention = Attention(output_size, use_additive=use_additive)# 定义分类层self.cls_fc = nn.Linear(in_features=output_size, out_features=self.n_classes)def forward(self, inputs):input_ids, valid_lens = inputsinput_ids = input_ids.to(device)valid_lens = valid_lens.to(device)batch_size = input_ids.shape[0]# 获取词向量embedded_input = self.embedding(input_ids)# 使用LSTM进行语义编码lstm_out, (h_n, c_n) = self.lstm(embedded_input)# 使用注意力机制context = self.attention(lstm_out, valid_lens)# 通过分类层输出类别预测logits = self.cls_fc(context)return logits
7.3加性注意力模型
7.3.1模型训练
# 可视化
def plot(runner, fig_name):plt.figure(figsize=(10, 5))plt.subplot(1, 2, 1)train_items = runner.train_step_losses[::30]train_steps = [x[0] for x in train_items]train_losses = [x[1] for x in train_items]plt.plot(train_steps, train_losses, color='#e4007f', label="Train loss")if runner.dev_losses[0][0] != -1:dev_steps = [x[0] for x in runner.dev_losses]dev_losses = [x[1] for x in runner.dev_losses]plt.plot(dev_steps, dev_losses, color='#f19ec2', linestyle='--', label="Dev loss")# 绘制坐标轴和图例plt.ylabel("loss", fontsize='large')plt.xlabel("step", fontsize='large')plt.legend(loc='upper right', fontsize='x-large')plt.subplot(1, 2, 2)# 绘制评价准确率变化曲线if runner.dev_losses[0][0] != -1:plt.plot(dev_steps, runner.dev_scores,color='#f19ec2', linestyle="--", label="Dev accuracy")else:plt.plot(list(range(len(runner.dev_scores))), runner.dev_scores,color='#f19ec2', linestyle="--", label="Dev accuracy")# 绘制坐标轴和图例plt.ylabel("score", fontsize='large')plt.xlabel("step", fontsize='large')plt.legend(loc='lower right', fontsize='x-large')plt.savefig(fig_name)plt.show()# 设置随机种子以确保实验可重复
torch.manual_seed(2021)# 训练配置
epochs = 2
vocab_size = len(word2id_dict) # 假设 word2id_dict 已定义
hidden_size = 128
embedding_size = 128
n_classes = 2
n_layers = 1
learning_rate = 0.001# 创建模型
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=True,
)use_gpu = torch.cuda.is_available() # 检查是否有可用的GPU
# print(use_gpu)
if use_gpu:device = torch.device('cuda:0') # 指定 GPU 0
else:device = torch.device('cpu') # 使用 CPU
# print(device)
if use_gpu:model_atten = model_atten.to(device) # 将模型转移到 GPU# 损失函数(交叉熬损失)
criterion = nn.CrossEntropyLoss()# 指定评价指标
metric = Accuracy()# 优化器
optimizer = optim.Adam(model_atten.parameters(), lr=learning_rate)# 实例化RunnerV3
runner = RunnerV3(model_atten, optimizer, criterion, metric, device)
save_path = "./checkpoints/model_best.pdparams"
start_time = time.time()
# 训练
runner.train(train_loader,dev_loader,num_epochs=epochs,log_steps=10,eval_steps=10,save_path=save_path,
)
end_time = time.time()print("训练时间:{}".format(end_time-start_time))
plot(runner, 'dotproduct-loss-acc.pdf')
7.3.2模型评价
model_path = "checkpoints/model_best.pdparams"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")
7.4点积注意力模型
7.4.1模型训练
# 设置随机种子以确保实验可重复
torch.manual_seed(2021)# 训练配置
epochs = 2
vocab_size = len(word2id_dict) # 假设 word2id_dict 已定义
hidden_size = 128
embedding_size = 128
n_classes = 2
n_layers = 1
learning_rate = 0.001# 创建模型
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=False,
)use_gpu = torch.cuda.is_available() # 检查是否有可用的GPU
# print(use_gpu)
if use_gpu:device = torch.device('cuda:0') # 指定 GPU 0
else:device = torch.device('cpu') # 使用 CPU
# print(device)
if use_gpu:model_atten = model_atten.to(device) # 将模型转移到 GPU# 损失函数(交叉熬损失)
criterion = nn.CrossEntropyLoss()# 指定评价指标
metric = Accuracy()# 优化器
optimizer = optim.Adam(model_atten.parameters(), lr=learning_rate)# 实例化RunnerV3
runner = RunnerV3(model_atten, optimizer, criterion, metric, device)
save_path = "./checkpoints/dotproduct_model_best.pdparams"
start_time = time.time()
# 训练
runner.train(train_loader,dev_loader,num_epochs=epochs,log_steps=10,eval_steps=10,save_path=save_path,
)
end_time = time.time()
print("训练时间:{}".format(end_time-start_time))plot(runner, 'dotproduct-loss-acc.pdf')
7.4.2模型评价
model_path = "checkpoints/dotproduct_model_best.pdparams"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")
(1)在不加注意力机制的情况下,测试集上的准确率为0.81856,加入了加性注意力后,测试集的准确率为0.84872;换成点积注意力后,测试集上的准确率为0.85280。 相比于不加注意力机制的模型,加入注意力机制的模型效果会更好些。
(2)另外,从加性注意力和点积注意力的结果可以看出,点积注意力的准确率更好些。
7.5注意力可视化
model_path = "checkpoints/dotproduct_model_best.pdparams"
# 加载模型权重
model_state_dict = torch.load(model_path)
model_atten.load_state_dict(model_state_dict)# 假设word2id_dict已经定义,并且max_seq_len定义为合适的值text = "this great science fiction film is really awesome"
# text = "This movie was craptacular"
# text = "I got stuck in traffic on the way to the theater"# 分词
sentence = text.split(" ")# 词映射成ID的形式
tokens = [word2id_dict[word] if word in word2id_dict else word2id_dict["[oov]"]for word in sentence
]# 取前max_seq_len的单词
tokens = tokens[:max_seq_len]# 序列长度
seq_len = torch.tensor([len(tokens)])# 转换成PyTorch的Tensor
input_ids = torch.tensor(tokens, dtype=torch.long).unsqueeze(0)# 准备输入
inputs = [input_ids, seq_len]# 假设模型已经被加载并放到正确的设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_atten.to(device)# 模型开启评估模式
model_atten.eval()# 设置不求梯度
with torch.no_grad():# 预测输出pred_prob = model_atten(inputs)# 提取注意力权重
atten_weights = model_atten.attention.attention_weights# 打印结果
print("输入的文本为:{}".format(text))
print("转换成id的形式为:{}".format(input_ids.numpy()))
print("训练的注意力权重为:{}".format(atten_weights.cpu().numpy()))rcParams.update({'figure.autolayout': True})
# 对文本进行分词,得到过滤后的词
list_words = text.split(" ")
# 提取注意力权重,转换成list
data_attention = atten_weights.cpu().numpy().tolist()
# 取出前max_seq_len变换进行特征融合,得到最后个词
list_words = list_words[:max_seq_len]
# 把权重转换为DataFrame,列名为单词
d = pd.DataFrame(data=data_attention, columns=list_words)
f, ax = plt.subplots(figsize=(20, 1.5))
# 用heatmap可视化
# sns.heatmap(d, vmin=0, vmax=0.4, ax=ax)
# sns.heatmap(d, vmin=0, vmax=0.4, ax=ax, cmap="OrRd")my_colors=['#e4007f', '#f19ec2', '#e86096', '#eb7aaa', '#f6c8dc', '#f5f5f5', '#000000', '#f7d2e2']
sns.heatmap(d, vmin=0, vmax=0.4, ax=ax, cmap=my_colors)
# 纵轴旋转360度
label_y = ax.get_yticklabels()
plt.setp(label_y, rotation=360, horizontalalignment="right")
# 横轴旋转0度
label_x = ax.get_xticklabels()
plt.setp(label_x, rotation=0, horizontalalignment="right", fontsize=20)
plt.savefig('att-vis.pdf')
plt.show()
输出结果如图所示,颜色越深代表权重越高,从图可以看出,注意力权重比较高的单词是"great","awesome"。