文章目录
- 深入解析 Transformer
- 1 理解 Transformer
- 1.1 理解自注意力机制 (Self-Attention)
- 1.2 理解位置编码 (Positional Encoding)
- 1.2.1 整数编码
- 1.2.2 正弦编码
- 1.3 理解编码器和解码器模块
- 1.3.1 编码器
- 1.4 最终线性层和 Softmax 层
- 2 编写 Transformer 的代码
- 2.1 摘要和引言
- 2.2 背景
- 2.3 模型架构
- 2.4 代码
- 2.4.1 多头注意力机制MHA
- 2.4.2 前馈网络
- 2.4.3 位置编码
- 2.4.4 编码器层
- 2.4.5 解码器层
- 2.4.6 编码器
- 2.4.7 解码器
- 2.4.8 整个Transformer
- 2.4.9 训练Transformers
- 3 其它及进一步阅读
深入解析 Transformer
https://goyalpramod.github.io/blogs/Transformers_laid_out/
提醒:阅读本文大约需要40分钟或者更多。
我发现现在关于 Transformer 的博客、视频和教程主要分为以下三类:
- 解释 Transformer 工作原理的: 其中 Jay Alammar 的博客是佼佼者。
- 解读 “Attention is All You Need” 论文的: 例如 The Annotated Transformer。
- 用 PyTorch 实现 Transformer 的: 例如 “Coding a ChatGPT Like Transformer From Scratch in PyTorch”。
这些资料都以出色的方式,从多个角度帮助我们理解单个概念。(本博客深受上述作品影响)
本文旨在:
- 直观地解释 Transformer 的工作原理。
- 详细阐述论文的每个部分,以及如何理解和实现它们。
- 从初学者的角度,用 PyTorch 编写代码实现。
- 将所有内容整合在一起,一站式学习。
如何使用本博客
我将首先简要概述 Transformer 的工作原理,以及它最初被开发的原因。
在建立基本背景之后,我们将深入代码本身。
我将引用论文中的章节,并说明我们将要编码的 Transformer 部分,同时提供代码示例,并附带提示和文档链接,例如:
class TransformerLRScheduler:def __init__(self, optimizer, d_model, warmup_steps):"""Args:optimizer: Optimizer to adjust learning rate ford_model: Model dimensionalitywarmup_steps: Number of warmup steps"""# YOUR CODE HEREdef step(self, step_num):"""Update learning rate based on step number"""# lrate = d_model^(-0.5) * min(step_num^(-0.5), step_num * warmup_steps^(-1.5))# YOUR CODE HERE - implement the formula
我将在代码块后添加有用的链接,但我建议你首先自己进行研究。这是成为一名优秀的工程师的第一步。
我建议你复制这些代码块,并尝试自己实现它们。
为了简化过程,在开始编码之前,我将详细解释每个部分。如果你仍然无法解决,请回来查看我的实现。
1 理解 Transformer
最初的 Transformer 是为机器翻译任务而设计的,这也是我们将要做的。我们将尝试将英语句子“I like Pizza”翻译成印地语。
不过,在此之前,让我们先简单了解一下 Transformer 这个黑盒。我们可以看到它由编码器(Encoder)和解码器(Decoder)组成。
在传递给编码器之前,句子“I like Pizza”会被分解成各个单词*,并使用嵌入(embedding)矩阵(与 Transformer 一起训练)进行嵌入。
然后,将位置信息添加到这些嵌入中。
之后,这些嵌入被传递到编码器模块,它主要完成两件事:
- 应用自注意力机制(Self-Attention)来理解各个单词彼此之间的关系。
- 将自注意力分数输出到前馈网络(Feed Forward Network)。
解码器模块接收来自编码器的输出,对其进行自身处理,产生一个输出,并将该输出发送回自身以生成下一个单词。
你可以这样理解:编码器理解你的语言,我们称之为 X,以及另一种语言,我们称之为 Y。解码器理解 Y 和你尝试翻译的语言 X,我们称之为 Z。
因此,Y 充当编码器和解码器交流的通用语言,以产生最终输出。
*为了便于理解,我们使用了单词,但大多数现代大型语言模型 (LLM) 并不使用单词,而是使用“标记(Token)”。
1.1 理解自注意力机制 (Self-Attention)
我们都听说过著名的三元组“查询(Query)、键(Key)和值(Value)”。我曾一度迷失在思考这些术语背后的由来。Q、K、Y 与字典(或传统 CS 中的映射)有关吗?它是受到之前论文的启发吗?如果是,它们是如何出现的?
让我们首先建立对这个想法的直观理解。
句子 (S): “Pramod loves pizza”(Pramod 喜欢披萨)
问题:
"who loves pizza?"谁喜欢披萨?
你可以为这个句子提出任意多的问题(查询)。现在,对于每个查询,你都会有一个特定的信息片段(键),它将为你提供想要的答案(值)。
查询:
- Q ->who loves pizza 谁喜欢披萨?
- K -> pizza披萨、Pramod、loves喜欢(实际上所有单词都会有不同程度的重要性)
- V -> pizza披萨(值并非直接的答案,而是类似于答案的矩阵表示)
这实际上是一种过于简化的说法,但它可以帮助我们理解,查询、键和值都只能通过句子本身创建。
让我们先了解如何应用自注意力机制,然后了解为什么要这样做。此外,在接下来的解释中,将 Q、K、V 视为纯矩阵,而不是其他任何东西。
首先,将单词 “delicious pizza 美味的披萨” 转换为嵌入。然后,将它们乘以权重 W_Q、W_K 和 W_V 以生成 Q、K 和 V 向量。
这些权重 W_Q、W_K 和 W_V 与 Transformer 一起训练。注意,向量 Q、K 和 V 的大小小于 x1 和 x2 的大小。即,x1 和 x2 是大小为 512 的向量,而 Q、K 和 V 的大小为 64。这是一种架构选择,目的是使计算更小、更快。
总的来看,就是x1要变成q1,k1,v1 ,x2要变成q2,k2,v2,如果有更多的输入,那么是同理;上图是x是1x4,q,k,v则是1x3, 如果有多个输入,x=Nx4 W_Q=4x3 得到 q=Nx3.
现在,使用这些 Q、K 和 V 向量计算注意力分数。
为了计算第一个单词“delicious美味的”的注意力分数,我们取该单词的查询 (q1) 和键 (k1),并将它们进行点积运算。(点积非常适合查找事物之间的相似性)。
然后,我们将其除以键向量维度的平方根。这样做是为了稳定训练。
对第一个单词的查询 (q1) 和不同单词的所有键(在本例中为 k1 和 k2)执行相同的操作。
最后,使用所有值,我们对每个值进行 softmax 运算。
然后,将这些值乘以每个单词的值 (v1, v2)。直观地讲,是为了得到每个单词相对于所选单词的重要性。不太重要的单词会被乘以 0.001 等较小的值而淹没。
最后,将所有内容加起来得到 Z 向量。
Transformer 的亮点在于可以并行计算。因此,我们不处理向量,而是处理矩阵。
实现过程保持不变。
首先,计算 Q、K 和 V 矩阵。
其次,计算注意力分数。
K_t 是K的转置
第三,对每个注意力头重复这些步骤。
这是每个注意力头的输出结果。
最后,将所有注意力头的输出连接起来,并乘以矩阵 WO(与模型一起训练)以获得最终的注意力分数。
以下是所有过程的总结:
上图展示了从输入句子到输出自注意力结果的完整步骤。它主要分为以下几个阶段:
-
输入与嵌入 (1, 2):
- 输入句子: 开始时,我们有一个句子,例如“Delicious Pizza”。
- 词嵌入: 句中的每个词都被转化为一个向量表示,也就是词嵌入。图中使用“X”表示词嵌入后的矩阵。
- 注意: 对于第一个编码器层(encoder_1),我们直接使用词嵌入“X”。而在后续的编码器层(encoder_2, encoder_3,…),我们则使用前一个编码器层的输出,用“R”表示。
-
多头处理 (3):
- 权重矩阵: 将嵌入后的矩阵“X”或“R” 分别与三个权重矩阵 (W_Q, W_K, W_V) 相乘。 这些权重矩阵是模型训练过程中学习到的参数,每个头都有自己独立的W_Q, W_K, W_V 。
- Q (Query) 查询向量: W_Q * X (或 R)
- K (Key) 键向量: W_K * X (或 R)
- V (Value) 值向量: W_V * X (或 R)
- 多头 (8 heads): 这里,输入数据被“分割”成了 8 个“头”,每个头都有一组不同的W_Q, W_K, W_V 矩阵,分别计算得到 Q1,K1,V1,Q2,K2,V2… Q8,K8,V8。 这允许模型从不同的角度关注输入信息,从而捕捉更丰富的信息。 图中,“**8” 是一个架构选择,可以改为其他数字,比如64。
- 权重矩阵: 将嵌入后的矩阵“X”或“R” 分别与三个权重矩阵 (W_Q, W_K, W_V) 相乘。 这些权重矩阵是模型训练过程中学习到的参数,每个头都有自己独立的W_Q, W_K, W_V 。
-
计算注意力得分 (4):
- 每个头分别计算 Q,K,V 的点积,得到注意力分数。 这是一个关键步骤,它决定了在输入句子中哪些词需要被赋予更大的关注权重。 每个头输出的注意力结果表示为 QK/sqrt(d_k) 之后再经过 softmax 归一化 (图中省略了这一步)。
- d_k 为每个头 K向量的维度
-
注意力加权 (4):
- 根据上一步计算得到的注意力分数,对 V (Value) 向量进行加权求和。 这将产生每个头关注加权后的输出。图中以Z1, Z2,… Z8表示。
-
合并与最终输出 (5):
- 拼接: 每个头的注意力结果 (Z1, Z2,… Z8) 被拼接在一起,形成一个更大的矩阵。
- 投影: 将拼接后的矩阵与一个额外的权重矩阵 WO (W 输出) 相乘,得到最终的自注意力输出“Z”。 这个WO 矩阵也是模型训练过程中学习到的参数。
总结:
这个图的核心思想是:
- 多头机制: 通过将输入数据分割成多个“头”进行处理,使模型能够从多个不同的角度捕捉输入信息,并提高模型的表达能力。
- 自注意力机制: 通过计算 Q,K,V 的点积和加权,让模型在处理一个词的时候,同时考虑到句子中的其他词。
- 线性变换 (W_Q, W_K, W_V, WO): 通过与这些权重矩阵的乘法,将输入数据进行转换和整合,最终产生模型的输出。
关键点:
- Encoder vs. 其他层: 图中标注说明,对于第一个encoder层,输入是词嵌入X,而后续encoder层则使用前一层encoder的输出R作为输入。
- 架构选择: 图中标注了“8”头是一种架构选择,可以使用其他数字。
现在,让我们来理解为什么它会起作用:
忘记多头注意力、注意力模块和所有术语。
想象一下,你在 A 点,想去大城市中的 B 点。
你认为只有一条路可以到达那里吗?当然不是,有成千上万种方法可以到达那个点。
但是,在你尝试过很多方法之前,你永远不会知道最佳路径。越多越好。
因此,单个矩阵乘法无法获得查询和键的最佳表示。
可以进行多个查询,可以为每个查询完成多个键。
这就是为什么我们要进行如此多的矩阵乘法,以尝试获得与用户提出的问题相关的查询的最佳键。
为了可视化自注意力如何创建不同的表示。让我们看一下单词“apple苹果”、“market市场”和“cellphone手机”的三种不同表示。
哪种表示最能回答以下问题?
- What does the company apple make苹果公司生产什么?
表示 2 是回答此问题的最佳选择,它给出的答案是“手机”,因为这是与它最接近的答案。
那下一个问题呢?
- Where can I get a new iphone我在哪里可以买到新的 iPhone?
在这种情况下,表示 3 将是最佳选择,我们将得到答案“市场”。
(上边三种不同表示这些是线性变换,可以应用于任何矩阵,第三个称为剪切运算)
1.2 理解位置编码 (Positional Encoding)
为了理解什么是位置编码以及为什么需要它,让我们想象一下没有位置编码的情况。
首先,输入句子,例如“Pramod likes to eat pizza with his friends Pramod 喜欢和他的朋友一起吃披萨”。将被分解成各自的单词*。
“Pramod”、“likes喜欢”、“eat吃”、“pizza披萨”、“ with和”、“his他的”、“friends朋友”
其次,每个单词都将被转换为给定维度的嵌入。
现在,如果没有位置编码。模型没有关于单词相对位置的信息。(因为所有内容都是并行处理的)
因此,这个句子与“Pramod likes to eat eat friends with hiss pizza 喜欢和他的披萨朋友一起吃”或任何其他单词排列没有什么不同。
因此,我们需要 PE(位置编码)的原因是为了告诉模型不同单词彼此之间的相对位置。
现在,这种 PE 的首选特性是什么:
- 每个位置的唯一编码: 否则,它将不断地为不同长度的句子改变。对于 10 个单词的句子,位置 2 将不同于 100 个单词的句子中同样的2。这将妨碍训练,因为没有可以遵循的可预测模式。
- 两个编码位置之间的线性关系: 如果我知道一个单词的位置 p,那么应该很容易计算另一个单词的位置 p+k。这将使模型更容易学习模式。
- 扩展到为比训练中遇到的更长的序列: 如果模型受到训练中使用句子长度的限制,它将永远无法在现实世界中工作。
- 由模型可以学习的确定性过程生成: 它应该是一个简单的公式或容易计算的算法。为了帮助我们的模型更好地泛化。
- 可扩展到多个维度: 不同的场景可能有不同的维度,我们希望它在所有情况下都能工作。
1.2.1 整数编码
阅读上述条件,任何人的第一想法都会是。“为什么不直接添加单词的位置?”这种简单的方法适用于短句子。但是对于较长的句子,例如一篇包含 2000 个单词的文章,添加位置 2000 可能会导致梯度爆炸或消失。
还有其他替代方法,例如规范化整数编码、二进制编码。但每种方法都有其自身的问题。要详细了解更多信息,请参阅此处You could have designed state of the art positional encoding。
1.2.2 正弦编码
满足我们所有条件的一种编码方法是使用正弦函数。如论文中所述。
但是,如果正弦满足所有条件,为什么还要交替使用余弦呢?
好吧,正弦不能满足所有条件,而只能满足大多数条件。正弦不能满足我们对线性关系的需求,因此我们也需要余弦。在此,我将提供一个简单的证明,该证明取自此处Linear Relationships in the Transformer’s Positional Encoding。
考虑一系列正弦和余弦对,每个都与一个频率wi相关联。我们的目标是找到一个线性变换矩阵
M,该矩阵可以将这些正弦函数移动一个固定的偏移量k:
频率wi遵循一个几何级数,该几何级数随着维数索引i的减小而减小,定义为:
为了找到这个变换矩阵,我们可以把它表示为一个具有未知系数的通用 2×2 矩阵:
通过将三角加法定理和差化积应用于右侧,我们可以将其展开为:
通过匹配系数,此展开式为我们提供了一个由两个方程组成的系统:
通过比较两边的项,我们可以求解未知的系数:
这些解为我们提供了最终的变换矩阵:
现在我们了解了什么是 PE 以及为什么使用正弦和余弦。让我们了解它是如何工作的。
pos = 单词在句子中的位置(“Pramod likes pizza 喜欢披萨”,Pramod 在位置 0,likes在 1,依此类推)
i = 嵌入的第 i 个和 (i+1) 个索引的值,偶数列号的正弦,奇数列号的余弦(“Pramod”被转换为嵌入的向量。它具有不同的索引)
d_model = 模型的维度(在我们的例子中为 512)
10,000 (n) = 这是一个实验确定的常数
正如你所看到的,使用这个公式,我们可以计算每个位置的 PE 值以及该位置的所有索引。这是一个简单的图示,显示了它是如何完成的。
现在,扩展上面的内容,这是它的函数形式:
更多可视化请参考.
这是 n = 10,000、d_model = 10,000 和序列长度 = 100 时原始数据的样子。此处提供生成代码:
将它想象成这样,y 轴上的每个索引代表一个单词,而 x 轴上对应于该索引的所有内容都是它的位置编码。
1.3 理解编码器和解码器模块
如果到目前为止一切都说得通,那么这对你来说将是轻而易举的。因为这正是我们将所有内容组合在一起的地方。
单个 Transformer 可以有多个编码器以及解码器模块。
1.3.1 编码器
让我们首先从编码器部分开始。
它由多个编码器组成,每个编码器模块包含以下部分:
- 多头注意力机制
- 残差连接
- 层归一化
- 前馈网络
残差连接
我们已经详细讨论了多头注意力机制,所以让我们来谈谈剩下的三个。
残差连接,也称为跳跃连接,正如其名称所暗示的那样。它们获取输入,跳过一个模块,并将其传递到下一个模块。
层归一化
层归一化是批归一化之后的一种发展。在我们谈论这两者中的任何一个之前,我们必须了解什么是归一化。
归一化是一种将不同特征置于同一尺度的方法,这样做是为了稳定训练。因为当模型尝试从尺度差异很大的特征中学习时,它会减慢训练速度并导致梯度爆炸。在此处。
批归一化是一种方法,其中从未来层减去整个批次的均值和标准差。
上图来自这里
在层归一化中,不是关注整个批次,而是关注单个实例的所有特征。
你可以这样理解,我们从一个句子中取每个单词,并规范化该单词。
为了更好地理解,请阅读此博客。
前馈网络
添加前馈网络 (FFN) 是为了向模型引入非线性和复杂性。虽然注意力机制非常擅长捕获序列中不同位置之间的关系,但它本质上仍然是一种线性运算(如前所述)。
FFN 通过其激活函数(通常为 ReLU)添加非线性,使模型能够学习更复杂的模式和变换,而纯注意力机制无法单独捕获这些模式和变换。
你可以这样理解:如果注意力机制就像进行对话,每个人都可以与其他人交谈(全局交互),那么 FFN 就像给每个人时间来深入思考他们所听到的内容并独立处理它(局部处理)。两者对于有效理解和转换输入都是必要的。如果没有 FFN,Transformer 学习复杂函数的能力将受到严重限制,并且本质上将仅限于通过注意力机制进行的加权平均运算。
解码器模块
编码器的输出作为数据处理中的键和值矩阵馈送到每个解码器模块。解码器模块是自回归的。这意味着它一个接一个地输出,并将其自身的输出作为输入。
- 解码器模块从编码器获取键和值,并从之前的输出中创建自己的查询。
- 使用第一步的输出,它移动到步骤 2,其中从先前的解码器模块的输出作为查询,键,值从编码器中获取。
- 此过程会重复进行,直到我们从解码器获得输出,然后将其作为创建下一个标记的输入
- 重复此过程,直到我们到达标记
解码器模块中也有一个小的变化,即我们应用一个掩码,以使自注意力机制仅关注输出序列中较早的位置。
这就是你能够编写自己的 Transformer 所需要的所有高级理解。现在让我们看一下论文以及代码。
1.4 最终线性层和 Softmax 层
解码器输出数字向量(通常是浮点数),该向量将发送到线性层。
线性层输出词汇表中每个单词的分数(训练数据集中唯一单词的数量)。
然后,将其发送到 softmax 层,该层将这些分数转换为概率。并给出具有最高概率的单词。(通常是这种情况,有时我们可以将其设置为获取第二个最有可能的单词,或第三个最有可能的单词等等)
2 编写 Transformer 的代码
对于以下部分,我建议你打开 3 个选项卡。本博客、Jupyter 笔记本和原始论文。
2.1 摘要和引言
本节将向你介绍论文的内容以及最初的编写原因。
有一些概念可以帮助你学习新东西,例如 RNN、卷积神经网络CNN和关于 BLEU。
重要的是要知道,Transformer 最初是为文本到文本翻译而创建的。即从一种语言到另一种语言。
因此,它们具有编码器部分和解码器部分。它们传递信息,这被称为交叉注意力(稍后会详细介绍自注意力和交叉注意力之间的区别)。
2.2 背景
本节通常会讨论该领域先前完成的工作、已知问题以及人们用来修复它们的方法。我们需要记住一件非常重要的事情。
“跟踪遥远的信息(长文本)”。Transformer 之所以令人惊叹,有很多原因,但关键的一点是它们可以记住遥远的关系。
随着句子变长,RNN 和 LSTM 等解决方案会丢失上下文含义。但是 Transformer 不会遇到这样的问题。(不过,当您阅读时希望不存在的问题。这里和长度是上下文窗口长度,这决定了 Transformer 可以看到多少信息)
2.3 模型架构
我们一直在等待的部分。我将在这里稍微偏离论文。因为我发现更容易遵循数据的处理过程。此外,如果你阅读该论文,它的每个单词都应该对你来说是有意义的。
我们将首先从多头注意力机制开始,然后是前馈网络,最后是位置编码,使用这些我们将完成编码器层,随后我们将转到解码器层,之后我们将编写编码器和解码器模块,最后以编写整个 Transformer 在真实世界数据上的训练循环结束。
完整的笔记本可以在这里找到
2.4 代码
必要的导入
import math
import torch
import torch.nn as nn
from torch.nn.functional import softmax
2.4.1 多头注意力机制MHA
到目前为止,你应该对注意力机制的工作原理有很好的了解,因此让我们首先从编写缩放点积注意力机制(scaled dot-product attention)的代码开始(因为 MHA 基本上是将多个缩放点积堆叠在一起)。参考第 3.2.1 节缩放点积注意力机制
张量大小、矩阵乘法、掩码填充
# 我的实现
def scaled_dot_product_attention(query, key, value, mask=None):"""计算缩放点积注意力参数说明:query: (batch_size, num_heads, seq_len_q, d_k) 查询张量key: (batch_size, num_heads, seq_len_k, d_k) 键张量value: (batch_size, num_heads, seq_len_v, d_v) 值张量mask: 可选的掩码张量,用于防止注意力访问某些位置"""# 检查输入张量的维度assert query.dim() == 4, f"Query应该是4维张量,但得到了{query.dim()}维"assert key.size(-1) == query.size(-1), "Key和Query的深度维度必须相等"assert key.size(-2) == value.size(-2), "Key和Value的序列长度必须相等"# 获取深度维度大小d_k = query.size(-1)# 计算注意力分数# 将query和key的转置相乘,然后除以sqrt(d_k)进行缩放scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)# 如果提供了掩码,将掩码位置的注意力分数设为负无穷if mask is not None:scores = scores.masked_fill(mask == 0, float('-inf'))# 对分数使用softmax得到注意力权重attention_weights = softmax(scores, dim=-1)# 将注意力权重与value相乘得到输出return torch.matmul(attention_weights, value)
使用这个,让我们完成 MHA
class MultiHeadAttention(nn.Module):#让我只为这个类编写初始化程序,以便你了解它的完成方式def __init__(self, d_model, num_heads):super().__init__()assert d_model % num_heads == 0, "d_model must be divisible by num_heads" #思考为什么?self.d_model = d_modelself.num_heads = num_headsself.d_k = d_model // num_heads # 注意:使用整数除法 //# 创建可学习的投影矩阵self.W_q = nn.Linear(d_model, d_model) #思考为什么我们从 d_model -> d_modelself.W_k = nn.Linear(d_model, d_model)self.W_v = nn.Linear(d_model, d_model)self.W_o = nn.Linear(d_model, d_model)@staticmethoddef scaled_dot_product_attention(query, key, value, mask=None):# YOUR IMPLEMENTATION HEREdef forward(self, query, key, value, mask=None):# get batch_size and sequence length# YOUR CODE HERE# 1. Linear projections# YOUR CODE HERE# 2. Split into heads# YOUR CODE HERE# 3. Apply attention# YOUR CODE HERE# 4. Concatenate heads# YOUR CODE HERE# 5. Final projection# YOUR CODE HERE
- 我很难理解 view 和 transpose 之间的区别。以下 2 个链接应该可以帮助你:何时使用 view、transpose 和 permute 以及 view 和 transpose 之间的差异
- Contiguous 和 view,我仍然不明白。直到我阅读了这些:Pytorch Internals 和 Contiguous & Non-Contiguous Tensor
- Linear
- 我还有一篇关于张量内部内存管理如何工作的文章,如果你有兴趣,请阅读此文章。
#我的实现
class MultiHeadAttention(nn.Module):def __init__(self, d_model, num_heads):"""多头注意力层初始化Args:d_model: 模型的维度num_heads: 注意力头的数量"""super().__init__()assert d_model % num_heads == 0, "d_model必须能被num_heads整除"self.d_model = d_model # 模型维度self.num_heads = num_heads # 注意力头数量self.d_k = d_model // num_heads # 每个头的维度# 创建可学习的投影矩阵# 每个矩阵的形状都是 (d_model, d_model)self.W_q = nn.Linear(d_model, d_model) # Query的变换矩阵self.W_k = nn.Linear(d_model, d_model) # Key的变换矩阵self.W_v = nn.Linear(d_model, d_model) # Value的变换矩阵self.W_o = nn.Linear(d_model, d_model) # 输出的变换矩阵@staticmethoddef scaled_dot_product_attention(query, key, value, mask=None):"""计算缩放点积注意力Args:query: shape (batch_size, num_heads, seq_len_q, d_k)key: shape (batch_size, num_heads, seq_len_k, d_k) value: shape (batch_size, num_heads, seq_len_v, d_v)mask: 可选的掩码张量Returns:注意力输出: shape (batch_size, num_heads, seq_len_q, d_v)"""# Shape checksassert query.dim() == 4, f"Query should be 4-dim but got {query.dim()}-dim"assert key.size(-1) == query.size(-1), "Key and query depth must be equal"assert key.size(-2) == value.size(-2), "Key and value sequence length must be equal"d_k = query.size(-1)# Attention scoresscores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)if mask is not None:scores = scores.masked_fill(mask == 0, float('-inf'))attention_weights = softmax(scores, dim=-1)return torch.matmul(attention_weights, value)def forward(self, query, key, value, mask=None):"""前向传播Args:query: shape (batch_size, seq_len, d_model)key: shape (batch_size, seq_len, d_model)value: shape (batch_size, seq_len, d_model)mask: 可选的掩码张量Returns:输出: shape (batch_size, seq_len, d_model)"""batch_size = query.size(0)seq_len = query.size(1)# 1. 线性变换# shape: (batch_size, seq_len, d_model)Q = self.W_q(query)K = self.W_k(key)V = self.W_v(value)# 2. 将张量分割成多个注意力头# 重塑形状: (batch_size, seq_len, num_heads, d_k)# 转置后: (batch_size, num_heads, seq_len, d_k)Q = Q.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)K = K.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)V = V.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)# 3. 应用注意力机制# output shape: (batch_size, num_heads, seq_len, d_k)output = self.scaled_dot_product_attention(Q, K, V, mask)# 4. 合并多头# 转置回来: (batch_size, seq_len, num_heads, d_k)# 重塑形状: (batch_size, seq_len, d_model)output = output.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)# 5. 最终的线性变换# shape: (batch_size, seq_len, d_model)return self.W_o(output)
2.4.2 前馈网络
描述它的另一种方式是使用内核大小为 1 的两个卷积。输入和输出的维度为 dmodel = 512,内部层的维度为 df f = 2048
class FeedForwardNetwork(nn.Module):"""Position-wise Feed-Forward NetworkArgs:d_model: input/output dimensiond_ff: hidden dimensiondropout: dropout rate (default=0.1)"""def __init__(self, d_model, d_ff, dropout=0.1):super().__init__()#create a sequential ff model as mentioned in section 3.3#YOUR CODE HEREdef forward(self, x):"""Args:x: Input tensor of shape (batch_size, seq_len, d_model)Returns:Output tensor of shape (batch_size, seq_len, d_model)"""#YOUR CODE HERE
- Dropout
- 在哪里放置 Dropout
- ReLU
#我的实现
class FeedForwardNetwork(nn.Module):"""Position-wise Feed-Forward NetworkArgs:d_model: input/output dimensiond_ff: hidden dimensiondropout: dropout rate (default=0.1)"""def __init__(self, d_model, d_ff, dropout=0.1):super().__init__()self.model = nn.Sequential(nn.Linear(d_model, d_ff),nn.ReLU(),nn.Dropout(dropout),nn.Linear(d_ff, d_model),nn.Dropout(dropout))def forward(self, x):"""Args:x: Input tensor of shape (batch_size, seq_len, d_model)Returns:Output tensor of shape (batch_size, seq_len, d_model)"""return self.model(x)
2.4.3 位置编码
class PositionalEncoding(nn.Module):def __init__(self, d_model, max_seq_length=5000):super().__init__()# Create matrix of shape (max_seq_length, d_model)#YOUR CODE HERE# Create position vector#YOUR CODE HERE# Create division term#YOUR CODE HERE# Compute positional encodings#YOUR CODE HERE# Register buffer#YOUR CODE HEREdef forward(self, x):"""Args:x: Tensor shape (batch_size, seq_len, d_model)"""# YOUR CODE HERE
class PositionalEncoding(nn.Module):def __init__(self, d_model, max_seq_length=5000):super().__init__()# Create matrix of shape (max_seq_length, d_model)pe = torch.zeros(max_seq_length, d_model)# Create position vectorposition = torch.arange(0, max_seq_length).unsqueeze(1) # Shape: (max_seq_length, 1)# Create division termdiv_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))# Compute positional encodingspe[:, 0::2] = torch.sin(position * div_term)pe[:, 1::2] = torch.cos(position * div_term)# Register bufferself.register_buffer('pe', pe.unsqueeze(0)) # Shape: (1, max_seq_length, d_model)def forward(self, x):"""Args:x: Tensor shape (batch_size, seq_len, d_model)"""return x + self.pe[:, :x.size(1)] # Add positional encoding up to sequence length
2.4.4 编码器层
class EncoderLayer(nn.Module):def __init__(self, d_model, num_heads, d_ff, dropout=0.1):super().__init__()# 1. Multi-head attentionself.mha = MultiHeadAttention(d_model,num_heads)# 2. Layer normalizationself.layer_norm_1 = nn.LayerNorm(d_model)# 3. Feed forwardself.ff = FeedForwardNetwork(d_model,d_ff)# 4. Another layer normalizationself.layer_norm_2 = nn.LayerNorm(d_model)# 5. Dropoutself.dropout = nn.Dropout(dropout)def forward(self, x, mask=None):"""Args:x: Input tensor of shape (batch_size, seq_len, d_model)mask: Optional mask for paddingReturns:x: Output tensor of shape (batch_size, seq_len, d_model)"""# 1. Multi-head attention with residual connection and layer norm# att_output = self.attention(...)# x = x + att_output # residual connection# x = self.norm1(x) # layer normalizationatt_output = self.mha(x, x, x, mask)x = self.dropout(x + att_output) # Apply dropout after residualx = self.layer_norm_1(x)ff_output = self.ff(x)x = self.dropout(x + ff_output) # Apply dropout after residualx = self.layer_norm_2(x)# 2. Feed forward with residual connection and layer normreturn x
2.4.5 解码器层
class DecoderLayer(nn.Module):def __init__(self, d_model, num_heads, d_ff, dropout=0.1):super().__init__()# 1. Masked Multi-head attentionself.mha_1 = MultiHeadAttention(d_model,num_heads)# 2. Layer norm for first sub-layerself.layer_norm_1 = nn.LayerNorm(d_model)# 3. Multi-head attention for cross attention with encoder output# This will take encoder output as key and valueself.mha_2 = MultiHeadAttention(d_model,num_heads)# 4. Layer norm for second sub-layerself.layer_norm_2 = nn.LayerNorm(d_model)# 5. Feed forward networkself.ff = FeedForwardNetwork(d_model,d_ff)# 6. Layer norm for third sub-layerself.layer_norm_3 = nn.LayerNorm(d_model)# 7. Dropoutself.dropout = nn.Dropout(dropout)def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):"""Args:x: Target sequence embedding (batch_size, target_seq_len, d_model)encoder_output: Output from encoder (batch_size, source_seq_len, d_model)src_mask: Mask for source paddingtgt_mask: Mask for target padding and future positions"""# 1. Masked self-attention# Remember: In decoder self-attention, query, key, value are all xatt_output = self.mha_1(x,x,x,tgt_mask)x = self.dropout(x + att_output)x = self.layer_norm_1(x)att_output_2 = self.mha_2(x, encoder_output,encoder_output, src_mask)x = self.dropout(x + att_output_2)x = self.layer_norm_2(x)ff_output = self.ff(x)x = self.dropout(x + ff_output)x = self.layer_norm_3(x)return x
2.4.6 编码器
class Encoder(nn.Module):def __init__(self,vocab_size,d_model,num_layers=6,num_heads=8,d_ff=2048,dropout=0.1,max_seq_length=5000):super().__init__()# 1. Input embeddingself.embeddings = nn.Embedding(vocab_size, d_model)self.scale = math.sqrt(d_model)# 2. Positional encodingself.pe = PositionalEncoding(d_model, max_seq_length)# 3. Dropoutself.dropout = nn.Dropout(dropout)# 4. Stack of N encoder layersself.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout)for _ in range(num_layers)])def forward(self, x, mask=None):"""Args:x: Input tokens (batch_size, seq_len)mask: Mask for padding positionsReturns:encoder_output: (batch_size, seq_len, d_model)"""# 1. Pass through embedding layer and scalex = self.embeddings(x) * self.scale# 2. Add positional encoding and apply dropoutx = self.dropout(self.pe(x))# 3. Pass through each encoder layerfor layer in self.encoder_layers:x = layer(x, mask)return x
2.4.7 解码器
class Decoder(nn.Module):def __init__(self,vocab_size,d_model,num_layers=6,num_heads=8,d_ff=2048,dropout=0.1,max_seq_length=5000):super().__init__()# 1. Output embeddingself.embeddings = nn.Embedding(vocab_size, d_model)self.scale = math.sqrt(d_model)# 2. Positional encodingself.pe = PositionalEncoding(d_model, max_seq_length)# 3. Dropoutself.dropout = nn.Dropout(dropout)# 4. Stack of N decoder layersself.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout)for _ in range(num_layers)])def forward(self, x, encoder_output, src_mask=None, tgt_mask=None):"""Args:x: Target tokens (batch_size, target_seq_len)encoder_output: Output from encoder (batch_size, source_seq_len, d_model)src_mask: Mask for source paddingtgt_mask: Mask for target padding and future positionsReturns:decoder_output: (batch_size, target_seq_len, d_model)"""# 1. Pass through embedding layer and scalex = self.embeddings(x) * self.scale# 2. Add positional encoding and dropoutx = self.dropout(self.pe(x))# 3. Pass through each decoder layerfor layer in self.decoder_layers:x = layer(x, encoder_output, src_mask, tgt_mask)return x
实用代码
def create_padding_mask(seq):"""Create mask for padding tokens (0s)Args:seq: Input sequence tensor (batch_size, seq_len)Returns:mask: Padding mask (batch_size, 1, 1, seq_len)"""batch_size, seq_len = seq.shapeoutput = torch.eq(seq, 0).float()return output.view(batch_size, 1, 1, seq_len)def create_future_mask(size):"""Create mask to prevent attention to future positionsArgs:size: Size of square mask (target_seq_len)Returns:mask: Future mask (1, 1, size, size)"""# Create upper triangular matrix and invert itmask = torch.triu(torch.ones((1, 1, size, size)), diagonal=1) == 0return maskdef create_masks(src, tgt):"""Create all masks needed for trainingArgs:src: Source sequence (batch_size, src_len)tgt: Target sequence (batch_size, tgt_len)Returns:src_mask: Padding mask for encodertgt_mask: Combined padding and future mask for decoder"""# 1. Create padding maskssrc_padding_mask = create_padding_mask(src)tgt_padding_mask = create_padding_mask(tgt)# 2. Create future masktgt_len = tgt.size(1)tgt_future_mask = create_future_mask(tgt_len)# 3. Combine padding and future mask for target# Both masks should be True for allowed positionstgt_mask = tgt_padding_mask & tgt_future_maskreturn src_padding_mask, tgt_mask
2.4.8 整个Transformer
class Transformer(nn.Module):def __init__(self,src_vocab_size,tgt_vocab_size,d_model,num_layers=6,num_heads=8,d_ff=2048,dropout=0.1,max_seq_length=5000):super().__init__()# Pass all necessary parameters to Encoder and Decoderself.encoder = Encoder(src_vocab_size,d_model,num_layers,num_heads,d_ff,dropout,max_seq_length)self.decoder = Decoder(tgt_vocab_size,d_model,num_layers,num_heads,d_ff,dropout,max_seq_length)# The final linear layer should project from d_model to tgt_vocab_sizeself.final_layer = nn.Linear(d_model, tgt_vocab_size)def forward(self, src, tgt):# Create masks for source and targetsrc_mask, tgt_mask = create_masks(src, tgt)# Pass through encoderencoder_output = self.encoder(src, src_mask)# Pass through decoderdecoder_output = self.decoder(tgt, encoder_output, src_mask, tgt_mask)# Project to vocabulary sizeoutput = self.final_layer(decoder_output)# Note: Usually don't apply softmax here if using CrossEntropyLoss# as it applies log_softmax internallyreturn output
Transformer实用代码
class TransformerLRScheduler:def __init__(self, optimizer, d_model, warmup_steps):"""Args:optimizer: Optimizer to adjust learning rate ford_model: Model dimensionalitywarmup_steps: Number of warmup steps"""# Your code here# lrate = d_model^(-0.5) * min(step_num^(-0.5), step_num * warmup_steps^(-1.5))self.optimizer = optimizerself.d_model = d_modelself.warmup_steps = warmup_stepsdef step(self, step_num):"""Update learning rate based on step number"""# Your code here - implement the formulalrate = torch.pow(self.d_model,-0.5)*torch.min(torch.pow(step_num,-0.5), torch.tensor(step_num) * torch.pow(self.warmup_steps,-1.5))class LabelSmoothing(nn.Module):def __init__(self, smoothing=0.1):super().__init__()self.smoothing = smoothingself.confidence = 1.0 - smoothingdef forward(self, logits, target):"""Args:logits: Model predictions (batch_size, vocab_size) #each row of vocab_size contains probability score of each labeltarget: True labels (batch_size) #each row of batch size contains the index to the correct label"""vocab_size = logits.size(-1)with torch.no_grad():# Create a soft target distributiontrue_dist = torch.zeros_like(logits) #create the zeros [0,0,...]true_dist.fill_(self.smoothing / (vocab_size - 1)) #fill with calculated value [0.000125..,0.000125...] (this is an arbitarary value for example purposes)true_dist.scatter_(1, target.unsqueeze(1), self.confidence) #add 1 to the correct index (read more on docs of pytorch)return torch.mean(torch.sum(-true_dist * torch.log_softmax(logits, dim=-1), dim=-1)) #return cross entropy loss
2.4.9 训练Transformers
def train_transformer(model, train_dataloader, criterion, optimizer, scheduler, num_epochs, device='cuda'):"""Training loop for transformerArgs:model: Transformer modeltrain_dataloader: DataLoader for training datacriterion: Loss function (with label smoothing)optimizer: Optimizerscheduler: Learning rate schedulernum_epochs: Number of training epochs"""# 1. Setupmodel = model.to(device)model.train()# For tracking training progresstotal_loss = 0all_losses = []# 2. Training loopfor epoch in range(num_epochs):print(f"Epoch {epoch + 1}/{num_epochs}")epoch_loss = 0for batch_idx, batch in enumerate(train_dataloader):# Get source and target batchessrc = batch['src'].to(device)tgt = batch['tgt'].to(device)# Create maskssrc_mask, tgt_mask = create_masks(src, tgt)# Prepare target for input and output# Remove last token from target for inputtgt_input = tgt[:, :-1]# Remove first token from target for outputtgt_output = tgt[:, 1:]# Zero gradientsoptimizer.zero_grad()# Forward passoutputs = model(src, tgt_input, src_mask, tgt_mask)# Reshape outputs and target for loss calculationoutputs = outputs.view(-1, outputs.size(-1))tgt_output = tgt_output.view(-1)# Calculate lossloss = criterion(outputs, tgt_output)# Backward passloss.backward()# Clip gradientstorch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)# Update weightsoptimizer.step()scheduler.step()# Update loss trackingepoch_loss += loss.item()# Print progress every N batchesif batch_idx % 100 == 0:print(f"Batch {batch_idx}, Loss: {loss.item():.4f}")# Calculate average loss for epochavg_epoch_loss = epoch_loss / len(train_dataloader)all_losses.append(avg_epoch_loss)print(f"Epoch {epoch + 1} Loss: {avg_epoch_loss:.4f}")# Save checkpointtorch.save({'epoch': epoch,'model_state_dict': model.state_dict(),'optimizer_state_dict': optimizer.state_dict(),'loss': avg_epoch_loss,}, f'checkpoint_epoch_{epoch+1}.pt')return all_losses
设置Dataset和DataLoader
import os
import torch
import spacy
import urllib.request
import zipfile
from torch.utils.data import Dataset, DataLoaderdef download_multi30k():"""Download Multi30k dataset if not present"""# Create data directoryif not os.path.exists('data'):os.makedirs('data')# Download files if they don't existbase_url = "https://raw.githubusercontent.com/multi30k/dataset/master/data/task1/raw/"files = {"train.de": "train.de.gz","train.en": "train.en.gz","val.de": "val.de.gz","val.en": "val.en.gz","test.de": "test_2016_flickr.de.gz","test.en": "test_2016_flickr.en.gz"}for local_name, remote_name in files.items():filepath = f'data/{local_name}'if not os.path.exists(filepath):url = base_url + remote_nameurllib.request.urlretrieve(url, filepath + '.gz')os.system(f'gunzip -f {filepath}.gz')def load_data(filename):"""Load data from file"""with open(filename, 'r', encoding='utf-8') as f:return [line.strip() for line in f]def create_dataset():"""Create dataset from files"""# Download data if neededdownload_multi30k()# Load datatrain_de = load_data('data/train.de')train_en = load_data('data/train.en')val_de = load_data('data/val.de')val_en = load_data('data/val.en')return (train_de, train_en), (val_de, val_en)class TranslationDataset(Dataset):def __init__(self, src_texts, tgt_texts, src_vocab, tgt_vocab, src_tokenizer, tgt_tokenizer):self.src_texts = src_textsself.tgt_texts = tgt_textsself.src_vocab = src_vocabself.tgt_vocab = tgt_vocabself.src_tokenizer = src_tokenizerself.tgt_tokenizer = tgt_tokenizerdef __len__(self):return len(self.src_texts)def __getitem__(self, idx):src_text = self.src_texts[idx]tgt_text = self.tgt_texts[idx]# Tokenizesrc_tokens = [tok.text for tok in self.src_tokenizer(src_text)]tgt_tokens = [tok.text for tok in self.tgt_tokenizer(tgt_text)]# Convert to indicessrc_indices = [self.src_vocab["<s>"]] + [self.src_vocab[token] for token in src_tokens] + [self.src_vocab["</s>"]]tgt_indices = [self.tgt_vocab["<s>"]] + [self.tgt_vocab[token] for token in tgt_tokens] + [self.tgt_vocab["</s>"]]return {'src': torch.tensor(src_indices),'tgt': torch.tensor(tgt_indices)}def build_vocab_from_texts(texts, tokenizer, min_freq=2):"""Build vocabulary from texts"""counter = {}for text in texts:for token in [tok.text for tok in tokenizer(text)]:counter[token] = counter.get(token, 0) + 1# Create vocabularyvocab = {"<s>": 0, "</s>": 1, "<blank>": 2, "<unk>": 3}idx = 4for word, freq in counter.items():if freq >= min_freq:vocab[word] = idxidx += 1return vocabdef create_dataloaders(batch_size=32):# Load tokenizersspacy_de = spacy.load("de_core_news_sm")spacy_en = spacy.load("en_core_web_sm")# Get data(train_de, train_en), (val_de, val_en) = create_dataset()# Build vocabulariesvocab_src = build_vocab_from_texts(train_de, spacy_de)vocab_tgt = build_vocab_from_texts(train_en, spacy_en)# Create datasetstrain_dataset = TranslationDataset(train_de, train_en,vocab_src, vocab_tgt,spacy_de, spacy_en)val_dataset = TranslationDataset(val_de, val_en,vocab_src, vocab_tgt,spacy_de, spacy_en)# Create dataloaderstrain_dataloader = DataLoader(train_dataset,batch_size=batch_size,shuffle=True,collate_fn=collate_batch)val_dataloader = DataLoader(val_dataset,batch_size=batch_size,shuffle=False,collate_fn=collate_batch)return train_dataloader, val_dataloader, vocab_src, vocab_tgtdef collate_batch(batch):src_tensors = [item['src'] for item in batch]tgt_tensors = [item['tgt'] for item in batch]# Pad sequencessrc_padded = torch.nn.utils.rnn.pad_sequence(src_tensors, batch_first=True, padding_value=2)tgt_padded = torch.nn.utils.rnn.pad_sequence(tgt_tensors, batch_first=True, padding_value=2)return {'src': src_padded,'tgt': tgt_padded}
开始训练
# Initialize your transformer with the vocabulary sizes
model = Transformer(src_vocab_size=len(vocab_src),tgt_vocab_size=len(vocab_tgt),d_model=512,num_layers=6,num_heads=8,d_ff=2048,dropout=0.1
)
criterion = LabelSmoothing(smoothing=0.1).to(device)# Now you can use your training loop
losses = train_transformer(model=model,train_dataloader=train_dataloader,criterion=criterion,optimizer=optimizer,scheduler=scheduler,num_epochs=10
)
3 其它及进一步阅读
以下是一些资源和更多信息,可以帮助你在学习过程中提供帮助:
什么是 torch.nn 真正的含义?
3Blue1Brown 的神经网络教程
恭喜你完成了这个教程/课程/博客,不管你如何理解它。根据人类的好奇心,你现在可能有一些问题。随时在 GitHub 上创建问题,我会将那些我认为初学者最常有的问题添加到这里。