目录
1 数据
2 模型构建
2.1 注意力打分函数
2.1.1 加性模型
2.1.2 点积模型
2.2 注意力分布计算
2.3 加权平均
2.4 完整注意力机制的实现
2.4 模型汇总
3 模型训练
3.1 使用加性注意力模型
3.2 使用点积注意力模型
4 模型评价
4.1 使用加性注意力模型
4.2 使用点积注意力模型
5 注意力可视化
6 完整代码
7 参考链接
1 数据
本实验使用和IMDB电影评论数据集。与实验16相同。所需代码如下:
# 加载IMDB数据集
def load_imdb_data(path):assert os.path.exists(path)# 初始化数据集列表trainset, devset, testset = [], [], []# 加载训练集数据for label in ['pos', 'neg']:label_path = os.path.join(path, 'train', label)for filename in os.listdir(label_path):if filename.endswith('.txt'):with open(os.path.join(label_path, filename), 'r', encoding='utf-8') as f:sentence = f.read().strip().lower() # 读取并处理每个评论trainset.append((sentence, label))# 加载测试集数据for label in ['pos', 'neg']:label_path = os.path.join(path, 'test', label)for filename in os.listdir(label_path):if filename.endswith('.txt'):with open(os.path.join(label_path, filename), 'r', encoding='utf-8') as f:sentence = f.read().strip().lower() # 读取并处理每个评论testset.append((sentence, label))# 随机拆分测试集的一半作为验证集random.shuffle(testset) # 打乱测试集顺序random.shuffle(trainset)split_index = len(testset) // 2 # 计算拆分索引devset = testset[:split_index] # 选择测试集前一半作为验证集testset = testset[split_index:] # 剩下的部分作为测试集return trainset, devset, testset# 加载词汇表文件,创建单词到ID的映射字典
def load_vocab(path):"""功能:从给定路径加载词汇表文件,并创建单词到ID的映射字典参数:path: 词汇表文件的路径返回值:word2id: 单词到ID的映射字典,形如 {'word1': 0, 'word2': 1,...}"""assert os.path.exists(path) # 确保词表文件路径存在words = [] # 初始化空列表,存储词表中的单词with open(path, "r", encoding="utf-8") as f: # 打开文件并读取内容words = f.readlines() # 读取文件中的所有行words = [word.strip() for word in words if word.strip()] # 移除每个单词的前后空白字符并去掉空行word2id = dict(zip(words, range(len(words)))) # 创建一个字典,将单词与对应的ID映射return word2id # 返回这个字典# 自定义数据集类,用于处理IMDB数据集中的文本和标签
class IMDBDataset(Dataset):def __init__(self, examples, word2id_dict):super(IMDBDataset, self).__init__()self.word2id_dict = word2id_dictself.examples = self.words_to_id(examples)'''将文本句子中的单词转换为对应的ID,处理不在词汇表中的单词(用[UNK]对应的ID替代),并将标签转换为数值形式('pos' -> 1, 'neg' -> 0)'''def words_to_id(self, examples):tmp_examples = []for idx, example in enumerate(examples):seq, label = example# 将单词映射为字典索引的ID, 对于词典中没有的单词用[UNK]对应的ID进行替代seq = [self.word2id_dict.get(word, self.word2id_dict['[UNK]']) for word in seq.split(" ")]seq_tensor = torch.tensor(seq).to(device) # 将张量移到正确设备# 映射标签: 'pos' -> 1, 'neg' -> 0label = 1 if label == 'pos' else 0 # 将标签从'pos'/'neg'转换为1/0tmp_examples.append([seq_tensor, label])return tmp_examplesdef __getitem__(self, idx): # 获取指定索引位置的示例数据seq, label = self.examples[idx]return seq, labeldef __len__(self): # 返回数据集的大小return len(self.examples)def collate_fn(batch_data, pad_val=1):seqs = []labels = []lens = []for seq, label in batch_data:seqs.append(seq)labels.append(label) # 这里不需要再额外嵌套一层列表,直接添加label即可lens.append(len(seq))max_len = max(lens)# 对序列进行填充操作,使其长度统一为批次中的最大长度for i in range(len(seqs)):try:# 将填充部分转换为Tensor,形状需要和seqs[i]的除了序列长度维度外其他维度匹配,这里假设seqs[i]是1维序列,所以形状为 (需要填充的长度,)padding_tensor = torch.full((max_len - len(seqs[i]),), pad_val, dtype=torch.long).to(device)# 使用torch.cat进行拼接,在维度0(序列长度维度)上拼接seqs[i] = torch.cat([seqs[i], padding_tensor], dim=0)except:print(f"出现异常,当前seqs[{i}]的形状为: {seqs[i].shape if isinstance(seqs[i], torch.Tensor) else None}")print(f"填充部分 [pad_val] * (max_len - len(seqs[i])) 的形状理论上应为 ({max_len - len(seqs[i])},),实际类型为: {type([pad_val] * (max_len - len(seqs[i])))}")break# 将序列列表转换为PyTorch张量seqs_tensor = list2tensor(seqs).to(device)lens_tensor = torch.tensor(lens).to(device)labels_tensor = torch.tensor(labels).unsqueeze(1).to(device)return (seqs_tensor, lens_tensor), labels_tensor# ===============封装dataloader=========================# 加载IMDB数据集
train_data, dev_data, test_data = load_imdb_data("./dataset/")
# 加载词汇表字典
word2id_dict = load_vocab("./dataset/imdb.vocab")
# 实例化Dataset
train_set = IMDBDataset(train_data, word2id_dict)
dev_set = IMDBDataset(dev_data, word2id_dict)
test_set = IMDBDataset(test_data, word2id_dict)
max_seq_len = 256
batch_size = 128
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size,shuffle=true, drop_last=False, collate_fn=collate_fn)
dev_loader = torch.utils.data.DataLoader(dev_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)
2 模型构建
模型框架如下图:
(1) 嵌入层:将输入句子中的词语转换为向量表示;
(2) LSTM层:基于双向LSTM网络来建模句子中词语的上下文表示;
(3) 注意力层:使用注意力机制来从LSTM层的输出中筛选和聚合有效的特征;
(4) 线性层:预测对应的类别得分。
利用注意力机制来进行更好的信息聚合,只需要从X中选择一些和任务相关的信息作为序列的聚合表示。
分别实现在注意力层中的注意力打分函数、注意力分布计算和加权平均三个模块。
2.1 注意力打分函数
2.1.1 加性模型
'''
注意力打分函数
'''
# ===============加性==================
class AdditiveScore(nn.Module):def __init__(self, hidden_size):super(AdditiveScore, self).__init__()self.fc_W = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_U = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_v = nn.Linear(hidden_size, 1, bias=False)# 查询向量使用均匀分布随机初始化self.q = torch.rand(1, hidden_size) * (0.5 - (-0.5)) + (-0.5) # 使用torch.rand生成[0, 1]均匀分布随机数并转换到[-0.5, 0.5]范围def forward(self, inputs):"""输入:- inputs:输入矩阵,shape=[batch_size, seq_len, hidden_size]输出:- scores:输出矩阵,shape=[batch_size, seq_len]"""# inputs: [batch_size, seq_len, hidden_size]batch_size, seq_len, hidden_size = inputs.shape# 将查询向量扩展到与输入批次维度匹配,方便后续计算q_expanded = self.q.expand(batch_size, -1, hidden_size)# scores: [batch_size, seq_len, hidden_size]scores = torch.tanh(self.fc_W(inputs) + self.fc_U(q_expanded))# scores: [batch_size, seq_len]scores = self.fc_v(scores).squeeze(-1)return scores
2.1.2 点积模型
# =================点积========================
class DotProductScore(nn.Module):def __init__(self, hidden_size):super(DotProductScore, self).__init__()# 使用均匀分布随机初始化一个查询向量self.q = torch.rand(hidden_size, 1) * (0.5 - (-0.5)) + (-0.5) # 通过torch.rand生成[0, 1]均匀分布随机数并转换到[-0.5, 0.5]范围def forward(self, inputs):"""输入:- inputs:输入矩阵,shape=[batch_size, seq_len, hidden_size]输出:- scores:输出矩阵,shape=[batch_size, seq_len]"""# 获取输入张量的形状信息batch_size, seq_length, hidden_size = inputs.shape# 进行点积运算,在PyTorch中使用torch.matmul实现scores = torch.matmul(inputs, self.q)# 去除最后一维(维度大小为1的那一维)scores = scores.squeeze(-1)return scores
2.2 注意力分布计算
[PAD]部分在填充-1e9之后,对应的Softmax输出变成了0,相当于把[PAD]这些没有特殊意义字符给屏蔽了,然后剩下元素计算注意力分布,这样做就减少了这些没有特殊意义单元对于注意力计算的影响。
2.3 加权平均
2.4 完整注意力机制的实现
'''
注意力层
'''
class Attention(nn.Module):def __init__(self, hidden_size, use_additive=False):super(Attention, self).__init__()self.use_additive = use_additive# 使用加性模型或者点积模型if self.use_additive:self.scores = AdditiveScore(hidden_size)else:self.scores = DotProductScore(hidden_size)self._attention_weights = Nonedef forward(self, X, valid_lens):"""输入:- X:输入矩阵,shape=[batch_size, seq_len, hidden_size]- valid_lens:长度矩阵,shape=[batch_size]输出:- context :输出矩阵,表示的是注意力的加权平均的结果"""# scores: [batch_size, seq_len]scores = self.scores(X)# arrange: [1,seq_len],比如seq_len=4, arrange变为 [0,1,2,3]arrange = torch.arange(scores.shape[1], dtype=torch.float32).unsqueeze(0)# valid_lens : [batch_size, 1]valid_lens = valid_lens.unsqueeze(1)# mask [batch_size, seq_len]mask = arrange < valid_lensy = torch.full(scores.shape, float('-1e9'), dtype=torch.float32)scores = torch.where(mask, scores, y)# attn_weights: [batch_size, seq_len]attn_weights = F.softmax(scores, dim=-1)self._attention_weights = attn_weights# context: [batch_size, 1, hidden_size]context = torch.matmul(attn_weights.unsqueeze(1), X)# context: [batch_size, hidden_size]context = context.squeeze(1)return context@propertydef attention_weights(self):return self._attention_weights
测试一下:
# 使用不同的打分函数进行测试
torch.manual_seed(2021)
X = torch.rand(1, 3, 3)
valid_lens = torch.tensor([2])
print("输入向量为 {}".format(X))
add_atten = Attention(hidden_size=3, use_additive=True)
context1 = add_atten(X, valid_lens)
dot_atten = Attention(hidden_size=3, use_additive=False)
context2 = dot_atten(X, valid_lens)
print("============使用加性打分函数时:=============")
print("注意力的输出为 : {}".format(context1))
print("注意力权重为 : {}".format(add_atten.attention_weights))
print("============使用点积打分函数时:=============")
print("注意力的输出为 : {}".format(context2))
print("注意力权重为 : {}".format(dot_atten.attention_weights))
输出结果:
输入向量为 tensor([[[0.4490, 0.7986, 0.9256],[0.2854, 0.8692, 0.6329],[0.1390, 0.7372, 0.5650]]])
============使用加性打分函数时:=============
注意力的输出为 : tensor([[0.3651, 0.8348, 0.7756]], grad_fn=<SqueezeBackward1>)
注意力权重为 : tensor([[0.4875, 0.5125, 0.0000]], grad_fn=<SoftmaxBackward0>)
============使用点积打分函数时:=============
注意力的输出为 : tensor([[0.3726, 0.8316, 0.7889]])
注意力权重为 : tensor([[0.5329, 0.4671, 0.0000]])
2.4 模型汇总
首先是嵌入层,用于输入的句子中的词语的向量化表示,接着就是双向LSTM来学习句子的上下文特征,随后接入注意力机制来进行特征筛选,最后接入输出层,得到该句子的分类。
'''
模型汇总
'''
class Model_LSTMAttention(nn.Module):def __init__(self,hidden_size,embedding_size,vocab_size,n_classes=10,n_layers=1,use_additive=False,):super(Model_LSTMAttention, self).__init__()# 表示LSTM单元的隐藏神经元数量,它也将用来表示hidden和cell向量状态的维度self.hidden_size = hidden_size# 表示词向量的维度self.embedding_size = embedding_size# 表示词典的的单词数量self.vocab_size = vocab_size# 表示文本分类的类别数量self.n_classes = n_classes# 表示LSTM的层数self.n_layers = n_layers# 定义embedding层self.embedding = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embedding_size)# 定义LSTM,它将用来编码网络self.lstm = nn.LSTM(input_size=self.embedding_size,hidden_size=self.hidden_size,num_layers=self.n_layers,bidirectional=True, # 在PyTorch中使用bidirectional=True表示双向LSTM)# lstm的维度输出output_size = self.hidden_size * 2# 定义Attention层self.attention = Attention(output_size, use_additive=use_additive)# 定义分类层,用于将语义向量映射到相应的类别self.cls_fc = nn.Linear(in_features=output_size, out_features=self.n_classes)def forward(self, inputs):input_ids, valid_lens = inputs# 获取训练的batch_sizebatch_size = input_ids.shape[0]# 获取词向量并且进行dropout(这里代码中没体现dropout操作,若后续需要可添加)embedded_input = self.embedding(input_ids)# 使用LSTM进行语义编码# 在PyTorch中,LSTM的输入参数顺序有所不同,同时不需要显式传入sequence_length参数(它会在内部处理序列长度相关情况)output, (last_hidden, last_cell) = self.lstm(embedded_input)# 使用注意力机制# 进行Attention, attn_weights: [batch_size, seq_len]output = self.attention(output, valid_lens)# 将其通过分类线性层,获得初步的类别数值logits = self.cls_fc(output)return logits
3 模型训练
基于RunnerV3来进行模型训练、评价和预测。 使用交叉熵损失函数,并用Adam作为优化器来训练,分别使用加性注意力模型和点积注意力模型。适当调参,并保存准确率最高的模型作为最佳模型。
3.1 使用加性注意力模型
'''
模型训练
'''
from Runner import RunnerV3,Accuracy,plot_training_loss_acc
# 迭代的epoch数
epochs = 2
# 词汇表的大小
vocab_size = len(word2id_dict)
# lstm的输出单元的大小
hidden_size = 128
# embedding的维度
embedding_size = 128
# 类别数
n_classes = 2
# lstm的层数
n_layers = 1
# 学习率
learning_rate = 0.05
# 定义交叉熵损失
criterion = nn.CrossEntropyLoss()
# 指定评价指标
metric = Accuracy()
# 实例化基于LSTM的注意力模型
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=True,
)
model_atten.to(device)# 定义优化器
optimizer = torch.optim.Adam(model_atten.parameters(), lr=learning_rate)
# 实例化
runner = RunnerV3(model_atten, optimizer, criterion, metric)
save_path = "./checkpoint2/model_best.pth"
start_time = time.time()
# 训练,假设train_loader和dev_loader是已经正确定义好的PyTorch的DataLoader对象,用于加载训练集和验证集数据
runner.train(train_loader,dev_loader,num_epochs=epochs,log_steps=10,eval_steps=10,save_path=save_path,
)
end_time = time.time()
print("训练时间:{}".format(end_time - start_time))plot_training_loss_acc(runner, fig_size=(16, 6), sample_step=10, loss_legend_loc="lower left",acc_legend_loc="lower right")
epoch = 3
[Train] epoch: 0/3, step: 0/588, loss: 0.69758
[Train] epoch: 0/3, step: 10/588, loss: 0.69123
.......
.....
....
[Evaluate] dev score: 0.83672, dev loss: 0.36764
[Train] epoch: 2/3, step: 560/588, loss: 0.26399
[Evaluate] dev score: 0.83840, dev loss: 0.36225
[Train] epoch: 2/3, step: 570/588, loss: 0.27535
[Evaluate] dev score: 0.83944, dev loss: 0.37281
[Train] epoch: 2/3, step: 580/588, loss: 0.24059
[Evaluate] dev score: 0.83896, dev loss: 0.37621
[Evaluate] dev score: 0.84064, dev loss: 0.36455
[Evaluate] best accuracy performence has been updated: 0.83992 --> 0.84064
[Train] Training done!
训练时间:527.9332256317139
观察图像未收敛还可以增大,于是我调制epoch = 10 :
.....
[Evaluate] dev score: 0.80552, dev loss: 1.08909
[Train] epoch: 9/10, step: 1950/1960, loss: 0.00096
[Evaluate] dev score: 0.80632, dev loss: 1.09985
[Evaluate] dev score: 0.80576, dev loss: 1.11132
[Train] Training done!
训练时间:2149.4201123714447
增大eooch至10,训练集损失一直下降,而验证集在700step之后呈上升趋势,很明显出现了过拟合现象。推断在lr= 0.001的情况下,合适的epoch应该为3-4。
3.2 使用点积注意力模型
只需要改:
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=False,
)
epoch=2 ,lr= 0.001运行结果:
....
[Train] epoch: 1/2, step: 360/392, loss: 0.10630
[Evaluate] dev score: 0.82696, dev loss: 0.46003
[Train] epoch: 1/2, step: 370/392, loss: 0.30635
[Evaluate] dev score: 0.82600, dev loss: 0.43685
[Train] epoch: 1/2, step: 380/392, loss: 0.25039
[Evaluate] dev score: 0.82208, dev loss: 0.49186
[Train] epoch: 1/2, step: 390/392, loss: 0.19082
[Evaluate] dev score: 0.81616, dev loss: 0.46762
[Evaluate] dev score: 0.82280, dev loss: 0.44383
[Train] Training done!
训练时间:52.858378648757935
4 模型评价
'''
模型评价
'''
model_path = "./checkpoint2/model_best.pth"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")
4.1 使用加性注意力模型
epoch =3:
Evaluate on test set, Accuracy: 0.83792
epoch = 10:
Evaluate on test set, Accuracy: 0.83432
epoch =3, lr=0.05
Evaluate on test set, Accuracy: 0.85592
此时准确率达到了0.8559!与不使用注意力机制对比提高了0.01的准确率。
4.2 使用点积注意力模型
epoch = 2,lr=0.001:
Evaluate on test set, Accuracy: 0.77632
准确率只有0.77,通过实验对比,使用点积注意力模型不如加性注意力模型的效果好。
5 注意力可视化
为了验证注意力机制学到了什么,把加性注意力的权重提取出来,然后进行可视化分析。
# ============注意力可视化===============
model_path = "checkpoint2/model_best.pth" # 将后缀修改为.pth,这是PyTorch模型保存的常见后缀格式
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=False,
)
model_state_dict = torch.load(model_path) # 使用torch.load加载模型参数
model_atten.load_state_dict(model_state_dict) # 将加载的参数应用到模型中
text = "this great science fiction film is really awesome"
# text = "This movie was craptacular"
# text = "I got stuck in traffic on the way to the theater"
# 分词
sentence = text.split(" ")
# 词映射成ID的形式
tokens = [word2id_dict[word] if word in word2id_dict else word2id_dict["[oov]"]for word in sentence
]
# 取前max_seq_len的单词
tokens = tokens[:max_seq_len]
# 序列长度
seq_len = torch.tensor([len(tokens)], dtype=torch.long) # 将序列长度转为torch的LongTensor类型
# 转换成PyTorch的Tensor,并增加一个批次维度(unsqueeze(0)的作用)
input_ids = torch.tensor(tokens, dtype=torch.long).unsqueeze(0).to(device) # 确保移到相应设备上(如GPU)
inputs = [input_ids, seq_len]
# 模型开启评估模式
model_atten.eval()
# 设置不求梯度,使用torch.no_grad上下文管理器
with torch.no_grad():# 预测输出pred_prob = model_atten(inputs)
# 提取注意力权重
atten_weights = model_atten.attention.attention_weights
print("输入的文本为:{}".format(text))
print("转换成id的形式为:{}".format(input_ids.cpu().numpy())) # 将张量移到CPU上再转为numpy数组进行打印输出
print("训练的注意力权重为:{}".format(atten_weights.cpu().numpy())) # 同样移到CPU上转为numpy数组打印
运行结果:
输入的文本为:this great science fiction film is really awesome
转换成id的形式为:[[ 11 85 1096 1261 19 7 64 1145]]
训练的注意力权重为:[[9.7763856e-05 9.0048057e-01 1.3302587e-02 1.8970986e-04 2.9538744e-041.3915513e-03 3.0419938e-05 8.4211990e-02]]
[[9.776385559234768e-05, 0.9004805684089661, 0.01330258697271347, 0.0001897098554763943, 0.00029538743547163904, 0.0013915513409301639, 3.0419938411796466e-05, 0.08421199023723602]]
import seaborn as sns
import pandas as pd
from matplotlib import rcParams
rcParams.update({'figure.autolayout': True})
# 对文本进行分词,得到过滤后的词
list_words = text.split(" ")
# 移到CPU上转为numpy数组
atten_weights = atten_weights.cpu().numpy()
# 提取注意力权重,转换成list
data_attention = atten_weights.tolist()
print(data_attention)
# 取出前max_seq_len变换进行特征融合,得到最后个词
list_words = list_words[:max_seq_len]
# 把权重转换为DataFrame,列名为单词
d = pd.DataFrame(data=data_attention, columns=list_words)
f, ax = plt.subplots(figsize=(20, 1.5))
sns.heatmap(d, vmin=0, vmax=0.4, ax=ax, cmap="OrRd")
label_y = ax.get_yticklabels()
plt.setp(label_y, rotation=360, horizontalalignment="right")
label_x = ax.get_xticklabels()
plt.setp(label_x, rotation=0, horizontalalignment="right", fontsize=20)
plt.show()
颜色越深代表权重越高,从图可以看出,注意力权重比较高的单词是"great","awesome"
6 完整代码
'''
@Function:基于双向LSTM和注意力机制实现文本分类
@Author: lxy
@Date: 2024/12/12
'''
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from functools import partial
import random
import numpy as np
import time
import matplotlib.pyplot as pltdevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")
"""添加全局配置,默认放到cuda上
"""
if torch.cuda.is_available():torch.set_default_tensor_type('torch.cuda.FloatTensor') # 设置默认张量类型为GPU上的FloatTensor类型print("放置成功")
else:torch.set_default_tensor_type('torch.FloatTensor') # 如果GPU不可用,设置为CPU上的FloatTensor类型
# seqs是一个由tensor格式的元素组成的列表,将这个列表转为tensor
def list2tensor(seqs):# 将seqs中的每个Tensor元素转换为numpy数组numpy_seqs = [element.detach().cpu().numpy() for element in seqs]# 将numpy数组沿着某个轴(这里假设沿着第一个轴,类似torch.stack里的dim=0的效果,可根据实际调整)进行堆叠stacked_numpy_seqs = np.stack(numpy_seqs, axis=0)# 再将堆叠后的numpy数组转换为Tensor并移动到指定设备上seqs_tensor = torch.from_numpy(stacked_numpy_seqs).to(device)return seqs_tensor# 加载IMDB数据集
def load_imdb_data(path):assert os.path.exists(path)# 初始化数据集列表trainset, devset, testset = [], [], []# 加载训练集数据for label in ['pos', 'neg']:label_path = os.path.join(path, 'train', label)for filename in os.listdir(label_path):if filename.endswith('.txt'):with open(os.path.join(label_path, filename), 'r', encoding='utf-8') as f:sentence = f.read().strip().lower() # 读取并处理每个评论trainset.append((sentence, label))# 加载测试集数据for label in ['pos', 'neg']:label_path = os.path.join(path, 'test', label)for filename in os.listdir(label_path):if filename.endswith('.txt'):with open(os.path.join(label_path, filename), 'r', encoding='utf-8') as f:sentence = f.read().strip().lower() # 读取并处理每个评论testset.append((sentence, label))# 随机拆分测试集的一半作为验证集random.shuffle(testset) # 打乱测试集顺序random.shuffle(trainset)split_index = len(testset) // 2 # 计算拆分索引devset = testset[:split_index] # 选择测试集前一半作为验证集testset = testset[split_index:] # 剩下的部分作为测试集return trainset, devset, testset# 加载词汇表文件,创建单词到ID的映射字典
def load_vocab(path):"""功能:从给定路径加载词汇表文件,并创建单词到ID的映射字典参数:path: 词汇表文件的路径返回值:word2id: 单词到ID的映射字典,形如 {'word1': 0, 'word2': 1,...}"""assert os.path.exists(path) # 确保词表文件路径存在words = [] # 初始化空列表,存储词表中的单词with open(path, "r", encoding="utf-8") as f: # 打开文件并读取内容words = f.readlines() # 读取文件中的所有行words = [word.strip() for word in words if word.strip()] # 移除每个单词的前后空白字符并去掉空行word2id = dict(zip(words, range(len(words)))) # 创建一个字典,将单词与对应的ID映射return word2id # 返回这个字典# 自定义数据集类,用于处理IMDB数据集中的文本和标签
class IMDBDataset(Dataset):def __init__(self, examples, word2id_dict):super(IMDBDataset, self).__init__()self.word2id_dict = word2id_dictself.examples = self.words_to_id(examples)'''将文本句子中的单词转换为对应的ID,处理不在词汇表中的单词(用[UNK]对应的ID替代),并将标签转换为数值形式('pos' -> 1, 'neg' -> 0)'''def words_to_id(self, examples):tmp_examples = []for idx, example in enumerate(examples):seq, label = example# 将单词映射为字典索引的ID, 对于词典中没有的单词用[UNK]对应的ID进行替代seq = [self.word2id_dict.get(word, self.word2id_dict['[UNK]']) for word in seq.split(" ")]seq_tensor = torch.tensor(seq).to(device) # 将张量移到正确设备# 映射标签: 'pos' -> 1, 'neg' -> 0label = 1 if label == 'pos' else 0 # 将标签从'pos'/'neg'转换为1/0tmp_examples.append([seq_tensor, label])return tmp_examplesdef __getitem__(self, idx): # 获取指定索引位置的示例数据seq, label = self.examples[idx]return seq, labeldef __len__(self): # 返回数据集的大小return len(self.examples)def collate_fn(batch_data, pad_val=1):seqs = []labels = []lens = []for seq, label in batch_data:seqs.append(seq)labels.append(label) # 这里不需要再额外嵌套一层列表,直接添加label即可lens.append(len(seq))max_len = max(lens)# 对序列进行填充操作,使其长度统一为批次中的最大长度for i in range(len(seqs)):try:# 将填充部分转换为Tensor,形状需要和seqs[i]的除了序列长度维度外其他维度匹配,这里假设seqs[i]是1维序列,所以形状为 (需要填充的长度,)padding_tensor = torch.full((max_len - len(seqs[i]),), pad_val, dtype=torch.long).to(device)# 使用torch.cat进行拼接,在维度0(序列长度维度)上拼接seqs[i] = torch.cat([seqs[i], padding_tensor], dim=0)except:print(f"出现异常,当前seqs[{i}]的形状为: {seqs[i].shape if isinstance(seqs[i], torch.Tensor) else None}")print(f"填充部分 [pad_val] * (max_len - len(seqs[i])) 的形状理论上应为 ({max_len - len(seqs[i])},),实际类型为: {type([pad_val] * (max_len - len(seqs[i])))}")break# 将序列列表转换为PyTorch张量seqs_tensor = list2tensor(seqs).to(device)lens_tensor = torch.tensor(lens).to(device)labels_tensor = torch.tensor(labels).unsqueeze(1).to(device)return (seqs_tensor, lens_tensor), labels_tensor# ===============封装dataloader=========================# 加载IMDB数据集
train_data, dev_data, test_data = load_imdb_data("./dataset/")
# 加载词汇表字典
word2id_dict = load_vocab("./dataset/imdb.vocab")
# 实例化Dataset
train_set = IMDBDataset(train_data, word2id_dict)
dev_set = IMDBDataset(dev_data, word2id_dict)
test_set = IMDBDataset(test_data, word2id_dict)
max_seq_len = 256
batch_size = 128
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)
dev_loader = torch.utils.data.DataLoader(dev_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size,shuffle=False, drop_last=False, collate_fn=collate_fn)'''
注意力打分函数
'''
# ===============加性==================
class AdditiveScore(nn.Module):def __init__(self, hidden_size):super(AdditiveScore, self).__init__()self.fc_W = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_U = nn.Linear(hidden_size, hidden_size, bias=False)self.fc_v = nn.Linear(hidden_size, 1, bias=False)# 查询向量使用均匀分布随机初始化self.q = torch.rand(1, hidden_size) * (0.5 - (-0.5)) + (-0.5) # 使用torch.rand生成[0, 1]均匀分布随机数并转换到[-0.5, 0.5]范围def forward(self, inputs):"""输入:- inputs:输入矩阵,shape=[batch_size, seq_len, hidden_size]输出:- scores:输出矩阵,shape=[batch_size, seq_len]"""# inputs: [batch_size, seq_len, hidden_size]batch_size, seq_len, hidden_size = inputs.shape# 将查询向量扩展到与输入批次维度匹配,方便后续计算q_expanded = self.q.expand(batch_size, -1, hidden_size)# scores: [batch_size, seq_len, hidden_size]scores = torch.tanh(self.fc_W(inputs) + self.fc_U(q_expanded))# scores: [batch_size, seq_len]scores = self.fc_v(scores).squeeze(-1)return scores# torch.manual_seed(2021)
# inputs = torch.rand(1, 3, 3)
# additiveScore = AdditiveScore(hidden_size=3)
# scores = additiveScore(inputs)
# print(scores)# =================点积========================
class DotProductScore(nn.Module):def __init__(self, hidden_size):super(DotProductScore, self).__init__()# 使用均匀分布随机初始化一个查询向量self.q = torch.rand(hidden_size, 1) * (0.5 - (-0.5)) + (-0.5) # 通过torch.rand生成[0, 1]均匀分布随机数并转换到[-0.5, 0.5]范围def forward(self, inputs):"""输入:- inputs:输入矩阵,shape=[batch_size, seq_len, hidden_size]输出:- scores:输出矩阵,shape=[batch_size, seq_len]"""# 获取输入张量的形状信息batch_size, seq_length, hidden_size = inputs.shape# 进行点积运算,在PyTorch中使用torch.matmul实现scores = torch.matmul(inputs, self.q)# 去除最后一维(维度大小为1的那一维)scores = scores.squeeze(-1)return scores# torch.manual_seed(2021)
# inputs = torch.rand(1, 3, 3)
# dotScore = DotProductScore(hidden_size=3)
# scores = dotScore(inputs)
# print(scores)'''
注意力层
'''
class Attention(nn.Module):def __init__(self, hidden_size, use_additive=False):super(Attention, self).__init__()self.use_additive = use_additive# 使用加性模型或者点积模型if self.use_additive:self.scores = AdditiveScore(hidden_size)else:self.scores = DotProductScore(hidden_size)self._attention_weights = Nonedef forward(self, X, valid_lens):"""输入:- X:输入矩阵,shape=[batch_size, seq_len, hidden_size]- valid_lens:长度矩阵,shape=[batch_size]输出:- context :输出矩阵,表示的是注意力的加权平均的结果"""# scores: [batch_size, seq_len]scores = self.scores(X)# arrange: [1,seq_len],比如seq_len=4, arrange变为 [0,1,2,3]arrange = torch.arange(scores.shape[1], dtype=torch.float32).unsqueeze(0)# valid_lens : [batch_size, 1]valid_lens = valid_lens.unsqueeze(1)# mask [batch_size, seq_len]mask = arrange < valid_lensy = torch.full(scores.shape, float('-1e9'), dtype=torch.float32)scores = torch.where(mask, scores, y)# attn_weights: [batch_size, seq_len]attn_weights = F.softmax(scores, dim=-1)self._attention_weights = attn_weights# context: [batch_size, 1, hidden_size]context = torch.matmul(attn_weights.unsqueeze(1), X)# context: [batch_size, hidden_size]context = context.squeeze(1)return context@propertydef attention_weights(self):return self._attention_weights# 使用不同的打分函数进行测试
# torch.manual_seed(2021)
# X = torch.rand(1, 3, 3)
# valid_lens = torch.tensor([2])
# print("输入向量为 {}".format(X))
# add_atten = Attention(hidden_size=3, use_additive=True)
# context1 = add_atten(X, valid_lens)
# dot_atten = Attention(hidden_size=3, use_additive=False)
# context2 = dot_atten(X, valid_lens)
# print("============使用加性打分函数时:=============")
# print("注意力的输出为 : {}".format(context1))
# print("注意力权重为 : {}".format(add_atten.attention_weights))
# print("============使用点积打分函数时:=============")
# print("注意力的输出为 : {}".format(context2))
# print("注意力权重为 : {}".format(dot_atten.attention_weights))'''
模型汇总
'''
class Model_LSTMAttention(nn.Module):def __init__(self,hidden_size,embedding_size,vocab_size,n_classes=10,n_layers=1,use_additive=False,):super(Model_LSTMAttention, self).__init__()# 表示LSTM单元的隐藏神经元数量,它也将用来表示hidden和cell向量状态的维度self.hidden_size = hidden_size# 表示词向量的维度self.embedding_size = embedding_size# 表示词典的的单词数量self.vocab_size = vocab_size# 表示文本分类的类别数量self.n_classes = n_classes# 表示LSTM的层数self.n_layers = n_layers# 定义embedding层self.embedding = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embedding_size)# 定义LSTM,它将用来编码网络self.lstm = nn.LSTM(input_size=self.embedding_size,hidden_size=self.hidden_size,num_layers=self.n_layers,bidirectional=True, # 在PyTorch中使用bidirectional=True表示双向LSTM)# lstm的维度输出output_size = self.hidden_size * 2# 定义Attention层self.attention = Attention(output_size, use_additive=use_additive)# 定义分类层,用于将语义向量映射到相应的类别self.cls_fc = nn.Linear(in_features=output_size, out_features=self.n_classes)def forward(self, inputs):input_ids, valid_lens = inputs# 获取训练的batch_sizebatch_size = input_ids.shape[0]# 获取词向量并且进行dropout(这里代码中没体现dropout操作,若后续需要可添加)embedded_input = self.embedding(input_ids)# 使用LSTM进行语义编码# 在PyTorch中,LSTM的输入参数顺序有所不同,同时不需要显式传入sequence_length参数(它会在内部处理序列长度相关情况)output, (last_hidden, last_cell) = self.lstm(embedded_input)# 使用注意力机制# 进行Attention, attn_weights: [batch_size, seq_len]output = self.attention(output, valid_lens)# 将其通过分类线性层,获得初步的类别数值logits = self.cls_fc(output)return logits
'''
模型训练
'''
from Runner import RunnerV3,Accuracy,plot_training_loss_acc
# 迭代的epoch
epochs = 2
# 词汇表的大小
vocab_size = len(word2id_dict)
# lstm的输出单元的大小
hidden_size = 128
# embedding的维度
embedding_size = 128
# 类别数
n_classes = 2
# lstm的层数
n_layers = 1
# 学习率
learning_rate = 0.001
# 定义交叉熵损失
criterion = nn.CrossEntropyLoss()
# 指定评价指标
metric = Accuracy()
# 实例化基于LSTM的注意力模型
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=True,
)
model_atten.to(device)# 定义优化器
optimizer = torch.optim.Adam(model_atten.parameters(), lr=learning_rate)
# 实例化
runner = RunnerV3(model_atten, optimizer, criterion, metric)
save_path = "./checkpoint2/model_best.pth"
start_time = time.time()
# 训练,假设train_loader和dev_loader是已经正确定义好的PyTorch的DataLoader对象,用于加载训练集和验证集数据
runner.train(train_loader,dev_loader,num_epochs=epochs,log_steps=10,eval_steps=10,save_path=save_path,
)
end_time = time.time()
print("训练时间:{}".format(end_time - start_time))# #plot_training_loss_acc(runner, fig_size=(16, 6), sample_step=10, loss_legend_loc="lower left",
# acc_legend_loc="lower right")
'''
模型评价
'''
model_path = "./checkpoint2/model_best.pth"
runner.load_model(model_path)
accuracy, _ = runner.evaluate(test_loader)
print(f"Evaluate on test set, Accuracy: {accuracy:.5f}")# ============注意力可视化===============
model_path = "checkpoint2/model_best.pth" # 将后缀修改为.pth,这是PyTorch模型保存的常见后缀格式
model_atten = Model_LSTMAttention(hidden_size,embedding_size,vocab_size,n_classes=n_classes,n_layers=n_layers,use_additive=True,
)
model_state_dict = torch.load(model_path) # 使用torch.load加载模型参数
model_atten.load_state_dict(model_state_dict) # 将加载的参数应用到模型中
text = "this great science fiction film is really awesome"
# text = "This movie was craptacular"
# text = "I got stuck in traffic on the way to the theater"
# 分词
sentence = text.split(" ")
# 词映射成ID的形式
tokens = [word2id_dict[word] if word in word2id_dict else word2id_dict["[oov]"]for word in sentence
]
# 取前max_seq_len的单词
tokens = tokens[:max_seq_len]
# 序列长度
seq_len = torch.tensor([len(tokens)], dtype=torch.long) # 将序列长度转为torch的LongTensor类型
# 转换成PyTorch的Tensor,并增加一个批次维度(unsqueeze(0)的作用)
input_ids = torch.tensor(tokens, dtype=torch.long).unsqueeze(0).to(device) # 确保移到相应设备上(如GPU)
inputs = [input_ids, seq_len]
# 模型开启评估模式
model_atten.eval()
# 设置不求梯度,使用torch.no_grad上下文管理器
with torch.no_grad():# 预测输出pred_prob = model_atten(inputs)
# 提取注意力权重
atten_weights = model_atten.attention.attention_weights
print("输入的文本为:{}".format(text))
print("转换成id的形式为:{}".format(input_ids.cpu().numpy())) # 将张量移到CPU上再转为numpy数组进行打印输出
print("训练的注意力权重为:{}".format(atten_weights.cpu().numpy())) # 同样移到CPU上转为numpy数组打印import seaborn as sns
import pandas as pd
from matplotlib import rcParams
rcParams.update({'figure.autolayout': True})
# 对文本进行分词,得到过滤后的词
list_words = text.split(" ")
# 移到CPU上转为numpy数组
atten_weights = atten_weights.cpu().numpy()
# 提取注意力权重,转换成list
data_attention = atten_weights.tolist()
print(data_attention)
# 取出前max_seq_len变换进行特征融合,得到最后个词
list_words = list_words[:max_seq_len]
# 把权重转换为DataFrame,列名为单词
d = pd.DataFrame(data=data_attention, columns=list_words)
f, ax = plt.subplots(figsize=(20, 1.5))
sns.heatmap(d, vmin=0, vmax=0.4, ax=ax, cmap="OrRd")
label_y = ax.get_yticklabels()
plt.setp(label_y, rotation=360, horizontalalignment="right")
label_x = ax.get_xticklabels()
plt.setp(label_x, rotation=0, horizontalalignment="right", fontsize=20)
plt.show()
7 参考链接
aistudio.baidu.com/projectdetail/8666087 --->源项目地址 |
【深度学习】(1) CNN中的注意力机制(SE、ECA、CBAM) |
通俗易懂理解注意力机制(Attention Mechanism)-CSDN博客 --->点赞点赞,学到了很多 |
注意力机制的两种常用形式-----加性注意力(Additive Attention)和点积注意力(Dot-product Attention |