一、基于pytorch的网络编写一个分词模型
#coding:utf8import torch
import torch.nn as nn
import jieba
import numpy as np
import random
import json
from torch.utils.data import DataLoader"""
基于pytorch的网络编写一个分词模型
我们使用jieba分词的结果作为训练数据
看看是否可以得到一个效果接近的神经网络模型
"""# TorchModel 类定义了一个包含嵌入层、RNN 层和线性分类层的神经网络
# 使用 nn.Embedding 将字符映射到高维空间,通过 nn.RNN 处理序列信息,使用 nn.Linear 进行分类。
class TorchModel(nn.Module):def __init__(self, input_dim, hidden_size, num_rnn_layers, vocab):super(TorchModel, self).__init__()self.embedding = nn.Embedding(len(vocab)+1, input_dim, padding_idx=0)self.rnn_layer = nn.RNN(input_size=input_dim,hidden_size=hidden_size,batch_first=True,num_layers=num_rnn_layers)self.classify = nn.Linear(hidden_size, 2)self.loss_func = nn.CrossEntropyLoss(ignore_index=-100)def forward(self, x, y=None):x = self.embedding(x) #input shape: (batch_size, sen_len), output shape:(batch_size, sen_len, input_dim)x, _ = self.rnn_layer(x) #output shape:(batch_size, sen_len, hidden_size)y_pred = self.classify(x) #output shape:(batch_size, sen_len, 2) -> y_pred.view(-1, 2) (batch_size*sen_len, 2)if y is not None:return self.loss_func(y_pred.view(-1, 2), y.view(-1))else:return y_pred# Dataset 类用于处理语料库数据,从文件中读取句子
# 使用 sentence_to_sequence 将句子转换为数字序列,使用 sequence_to_label 生成标记序列,使用 padding 方法将序列和标签填充到固定长度。
class Dataset:def __init__(self, corpus_path, vocab, max_length):self.vocab = vocabself.corpus_path = corpus_pathself.max_length = max_lengthself.load()def load(self):self.data = []with open(self.corpus_path, encoding="utf8") as f:for line in f:sequence = sentence_to_sequence(line, self.vocab)label = sequence_to_label(line)sequence, label = self.padding(sequence, label)sequence = torch.LongTensor(sequence)label = torch.LongTensor(label)self.data.append([sequence, label])#使用部分数据做展示,使用全部数据训练时间会相应变长if len(self.data) > 10000:break#将文本截断或补齐到固定长度def padding(self, sequence, label):sequence = sequence[:self.max_length]sequence += [0] * (self.max_length - len(sequence))label = label[:self.max_length]label += [-100] * (self.max_length - len(label))return sequence, labeldef __len__(self):return len(self.data)def __getitem__(self, item):return self.data[item]#文本转化为数字序列,为embedding做准备
def sentence_to_sequence(sentence, vocab):sequence = [vocab.get(char, vocab['unk']) for char in sentence]return sequence#基于结巴生成分级结果的标注
def sequence_to_label(sentence):words = jieba.lcut(sentence)label = [0] * len(sentence)pointer = 0for word in words:pointer += len(word)label[pointer - 1] = 1return label#加载字表
def build_vocab(vocab_path):vocab = {}with open(vocab_path, "r", encoding="utf8") as f:for index, line in enumerate(f):char = line.strip()vocab[char] = index + 1 #每个字对应一个序号vocab['unk'] = len(vocab) + 1return vocab#建立数据集
def build_dataset(corpus_path, vocab, max_length, batch_size):dataset = Dataset(corpus_path, vocab, max_length) #diy __len__ __getitem__data_loader = DataLoader(dataset, shuffle=True, batch_size=batch_size) #torchreturn data_loaderdef main():epoch_num = 5 #训练轮数batch_size = 20 #每次训练样本个数char_dim = 50 #每个字的维度hidden_size = 100 #隐含层维度num_rnn_layers = 1 #rnn层数max_length = 20 #样本最大长度learning_rate = 1e-3 #学习率vocab_path = "chars.txt" #字表文件路径corpus_path = "../corpus.txt" #语料文件路径vocab = build_vocab(vocab_path) #建立字表data_loader = build_dataset(corpus_path, vocab, max_length, batch_size) #建立数据集model = TorchModel(char_dim, hidden_size, num_rnn_layers, vocab) #建立模型optim = torch.optim.Adam(model.parameters(), lr=learning_rate) #建立优化器#训练开始for epoch in range(epoch_num):model.train()watch_loss = []for x, y in data_loader:optim.zero_grad() #梯度归零loss = model.forward(x, y) #计算lossloss.backward() #计算梯度optim.step() #更新权重watch_loss.append(loss.item())print("=========\n第%d轮平均loss:%f" % (epoch + 1, np.mean(watch_loss)))#保存模型torch.save(model.state_dict(), "model.pth")return#最终预测
def predict(model_path, vocab_path, input_strings):#配置保持和训练时一致char_dim = 50 # 每个字的维度hidden_size = 100 # 隐含层维度num_rnn_layers = 1 # rnn层数vocab = build_vocab(vocab_path) #建立字表model = TorchModel(char_dim, hidden_size, num_rnn_layers, vocab) #建立模型model.load_state_dict(torch.load(model_path)) #加载训练好的模型权重model.eval()for input_string in input_strings:#逐条预测x = sentence_to_sequence(input_string, vocab)with torch.no_grad():result = model.forward(torch.LongTensor([x]))[0]result = torch.argmax(result, dim=-1) #预测出的01序列#在预测为1的地方切分,将切分后文本打印出来for index, p in enumerate(result):if p == 1:print(input_string[index], end=" ")else:print(input_string[index], end="")print()if __name__ == "__main__":# main()input_strings = ["同时国内有望出台新汽车刺激方案","沪胶后市有望延续强势","经过两个交易日的强势调整后","昨日上海天然橡胶期货价格再度大幅上扬"]predict("model.pth", "chars.txt", input_strings)
模型分析
- 模型定义:
TorchModel
类定义了一个包含嵌入层、RNN 层和线性分类层的神经网络,使用nn.Embedding
将字符映射到高维空间,通过nn.RNN
处理序列信息,使用nn.Linear
进行分类。- 数据集处理:
Dataset
类用于处理语料库数据,从文件中读取句子,使用sentence_to_sequence
将句子转换为数字序列,使用sequence_to_label
生成标记序列,使用padding
方法将序列和标签填充到固定长度。build_vocab
函数从文件中构建词汇表,build_dataset
函数使用Dataset
类和DataLoader
进行批处理。
- 训练部分:
main
函数中,设置超参数,构建模型和优化器,进行多轮训练,计算损失、反向传播和更新参数,保存训练好的模型。
- 预测部分:
predict
函数加载训练好的模型和词汇表,对输入句子进行分词预测,将预测为 1 的位置进行分词,将结果打印输出。
- 数据预处理:
sentence_to_sequence
函数将输入的句子中的字符根据词汇表转换为数字序列,未在词汇表中的字符使用unk
的索引。sequence_to_label
函数利用结巴分词的结果,将分词结束位置标记为 1,其余为 0,生成标记序列。Dataset
类的padding
方法确保所有序列和标签具有相同的长度,便于批处理。
- 模型架构:
TorchModel
类的embedding
层将输入的数字序列映射到高维空间,rnn_layer
处理序列信息,classify
层将 RNN 的输出映射到 2 个类别(分词或不分词)。- 训练时使用
CrossEntropyLoss
计算损失,预测时使用torch.argmax
找到最可能的类别。
- 训练和预测流程:
main
函数设置训练的超参数,创建数据集和模型,使用Adam
优化器进行优化,保存训练好的模型。predict
函数加载训练好的模型,对输入句子进行分词预测并输出结果。
二、DAG(有向无环图)法做分词
import jieba#词典,每个词后方存储的是其词频,仅为示例,也可自行添加
Dict = {"经常":0.1,"经":0.05,"有":0.1,"常":0.001,"有意见":0.1,"歧":0.001,"意见":0.2,"分歧":0.2,"见":0.05,"意":0.05,"见分歧":0.05,"分":0.1}#根据上方词典,对于输入文本,构造一个存储有所有切分方式的信息字典
#学术叫法为有向无环图,DAG(Directed Acyclic Graph),不理解也不用纠结,只当是个专属名词就好
#这段代码直接来自于jieba分词
# jieba.cut
def calc_dag(sentence):DAG = {}n = len(sentence)for k in range(n):i = ktmplist = []while i < n:frag = sentence[k: i+1]if frag in Dict:tmplist.append(i)i += 1if not tmplist:tmplist = [k]DAG[k] = tmplistreturn DAGsentence = "经常有意见分歧"
print(calc_dag(sentence))
#结果应该为{0: [0, 1], 1: [1], 2: [2, 4], 3: [3, 4], 4: [4, 6], 5: [5, 6], 6: [6]}
#0:[0,1]代表句子中的第0个字,可以单独成词,或与第1个字一起成词
#2:[2,4]代表句子中的第2个字,可以单独成词,或第2-4个字一起成词
#依次类推
#这个字典中实际上就存储了所有可能的切分方式的信息#将DAG中的信息解码(还原)出来,用文本展示出所有切分方式
class DAGDecode:#通过两个队列来实现def __init__(self, sentence):self.sentence = sentenceself.DAG = calc_dag(sentence) #使用了上方的函数self.length = len(sentence)self.unfinish_path = [[]] #保存待解码序列的队列self.finish_path = [] #保存解码完成的序列的队列#对于每一个序列,检查是否需要继续解码#不需要继续解码的,放入解码完成队列#需要继续解码的,将生成的新队列,放入待解码队列#path形如:["经常", "有", "意见"]def decode_next(self, path):path_length = len("".join(path))if path_length == self.length: #已完成解码self.finish_path.append(path)returncandidates = self.DAG[path_length]new_paths = []for candidate in candidates:new_paths.append(path + [self.sentence[path_length:candidate+1]])self.unfinish_path += new_paths #放入待解码对列return#递归调用序列解码过程def decode(self):while self.unfinish_path != []:path = self.unfinish_path.pop(0) #从待解码队列中取出一个序列self.decode_next(path) #使用该序列进行解码sentence = "经常有意见分歧"
dd = DAGDecode(sentence)
dd.decode()
print(dd.finish_path)
代码分析
一、函数和类的功能分析:
-
calc_dag(sentence)
函数:- 功能:
- 该函数的主要目的是根据输入的句子和预定义的词典
Dict
构建一个有向无环图(DAG),用于存储句子中所有可能的词切分信息。
- 该函数的主要目的是根据输入的句子和预定义的词典
- 实现步骤:
- 首先,初始化一个空字典
DAG
用于存储结果。 - 获取输入句子的长度
n
。 - 遍历句子中的每个字符,从当前字符开始,通过不断增加子串长度,检查子串是否在
Dict
中。 - 若子串在
Dict
中,将该子串结束字符的索引添加到tmplist
中。 - 若
tmplist
为空,说明当前字符没有可切分的词,将当前字符索引添加到tmplist
。 - 最后将
k
作为键,tmplist
作为值存储在DAG
中。
- 首先,初始化一个空字典
- 功能:
-
DAGDecode
类:__init__(self, sentence)
方法:- 功能:
- 对输入的句子进行初始化操作,为后续的解码操作准备所需的数据结构。
- 实现步骤:
- 存储输入的句子。
- 调用
calc_dag(sentence)
函数生成有向无环图,并存储在self.DAG
中。 - 存储句子的长度。
- 初始化两个队列:
self.unfinish_path
存储待解码的序列,初始化为只包含一个空列表的列表;self.finish_path
存储已完成解码的序列,初始化为空列表。
- 功能:
decode_next(self, path)
方法:- 功能:
- 对于给定的部分解码路径,判断是否完成解码,若未完成则根据
self.DAG
生成新的待解码路径并添加到self.unfinish_path
中,若完成则添加到self.finish_path
中。
- 对于给定的部分解码路径,判断是否完成解码,若未完成则根据
- 实现步骤:
- 计算当前
path
所代表的字符串的长度。 - 若长度等于句子长度,说明解码完成,将
path
加入self.finish_path
。 - 若未完成,根据
self.DAG
中存储的信息,找出可能的下一个词的结束位置,生成新的解码路径并添加到self.unfinish_path
中。
- 计算当前
- 功能:
decode(self)
方法:- 功能:
- 循环从
self.unfinish_path
中取出路径,调用decode_next
方法进行解码,直到self.unfinish_path
为空。
- 循环从
- 实现步骤:
- 只要
self.unfinish_path
不为空,就取出其中的一个元素。 - 调用
decode_next
方法对该元素进行解码。
- 只要
- 功能:
二、代码逻辑总结:
- 首先,使用
calc_dag(sentence)
函数对输入的句子构建一个有向无环图,该图以字典的形式存储了从每个字符开始的所有可能的词切分信息。例如对于输入"经常有意见分歧"
,会得到{0: [0, 1], 1: [1], 2: [2, 4], 3: [3, 4], 4: [4, 6], 5: [5, 6], 6: [6]}
。 - 然后,
DAGDecode
类利用这个有向无环图进行解码操作:- 在
__init__
阶段,存储句子、有向无环图、句子长度,并初始化待解码和已完成解码的队列。 decode_next
方法会根据当前的部分解码结果判断是否继续解码,若继续解码,会根据DAG
生成新的可能路径添加到待解码队列中,若完成则添加到已完成队列中。decode
方法通过不断调用decode_next
方法处理待解码队列中的元素,最终将所有可能的句子切分方式存储在finish_path
中。
- 在
三、代码解释示例:
- 以输入句子
"经常有意见分歧"
为例:- 在
calc_dag
函数中:- 从
k = 0
开始,"经"
在Dict
中,"经常"
也在Dict
中,所以DAG[0] = [0, 1]
。 - 对于
k = 1
,只有"常"
在Dict
中,所以DAG[1] = [1]
。 - 对于
k = 2
,"有"
在Dict
中,"有意见"
也在Dict
中,所以DAG[2] = [2, 4]
。 - 以此类推,最终得到完整的
DAG
。
- 从
- 在
DAGDecode
类中:- 初始化时,
unfinish_path = [[]]
,finish_path = []
。 - 第一次调用
decode_next
对[]
进行处理,会根据DAG[0]
生成["经"]
和["经常"]
等新路径添加到unfinish_path
。 - 不断循环调用
decode_next
,直到unfinish_path
为空,最终得到所有可能的句子切分方式存储在finish_path
中。
- 初始化时,
- 在