目录
环境配置
构建Diffusion模型
位置向量
ResNet/ConvNeXT块
Attention模块
组规一化
条件U-Net
正向扩散
数据准备与处理
训练过程
推理过程
环境配置
首先进行环境配置、库的导入和一些设置操作,具体代码如下:
%%capture captured_output
# 这行可能是用于捕获后续代码块的输出
# 卸载已安装的 MindSpore 库
!pip uninstall mindspore -y
# 从指定源安装特定版本(2.2.14)的 MindSpore 库
!pip install -i https://pypi.mirrors.ustc.edu.cn/simple mindspore==2.2.14
# 查看当前安装的 MindSpore 版本
!pip show mindspore
# 导入一些必要的数学和绘图相关的库和模块
import math
from functools import partial
%matplotlib inline # 使 matplotlib 绘图可以在当前 notebook 中直接显示
import matplotlib.pyplot as plt
from tqdm.auto import tqdm # 用于显示进度条
import numpy as np
from multiprocessing import cpu_count # 获取 CPU 核心数量
from download import download # 导入自定义的下载模块 import mindspore as ms # 导入 MindSpore 库
import mindspore.nn as nn # 导入 MindSpore 中的神经网络模块
import mindspore.ops as ops # 导入 MindSpore 中的操作模块
from mindspore import Tensor, Parameter # 导入 MindSpore 中的张量和参数类
from mindspore import dtype as mstype # 导入 MindSpore 中的数据类型
from mindspore.dataset.vision import Resize, Inter, CenterCrop, ToTensor, RandomHorizontalFlip, ToPIL # 导入图像相关的操作
from mindspore.common.initializer import initializer # 导入初始化器
from mindspore.amp import DynamicLossScaler # 导入动态损失缩放器 ms.set_seed(0) # 设置 MindSpore 的随机数种子为 0,以确保结果的可重复性
运行结果:
Name: mindspore
Version: 2.2.14
Summary: MindSpore is a new open source deep learning training/inference framework that could be used for mobile, edge and cloud scenarios.
Home-page: https://www.mindspore.cn
Author: The MindSpore Authors
Author-email: contact@mindspore.cn
License: Apache 2.0
Location: /home/nginx/miniconda/envs/jupyter/lib/python3.9/site-packages
Requires: asttokens, astunparse, numpy, packaging, pillow, protobuf, psutil, scipy
Required-by:
构建Diffusion模型
首先,我们精心定义了一系列的辅助函数以及类,而这些函数和类将会在神经网络的实现过程中得以运用。与此同时,我们还特意为上采样和下采样操作设定了相应的别名。具体代码如下:
def rearrange(head, inputs): """ 这个函数接收输入 `inputs` 的形状信息和 `head` 值,将 `inputs` 重新整形 具体是将其从原始形状 `(b, hc, x, y)` 转变为 `(b, head, c, x * y)`,其中 `c = hc // head` """ b, hc, x, y = inputs.shape c = hc // head return inputs.reshape((b, head, c, x * y)) def rsqrt(x): """ 这个函数计算输入 `x` 的平方根的倒数 首先计算平方根,然后取其倒数 """ res = ops.sqrt(x) return ops.inv(res) def randn_like(x, dtype=None): """ 此函数生成与输入 `x` 形状相同的随机数数组 如果未指定 `dtype`,则使用 `x` 的数据类型;否则,按照指定的 `dtype` 生成 """ if dtype is None: dtype = x.dtype res = ops.standard_normal(x.shape).astype(dtype) return res def randn(shape, dtype=None): """ 这个函数生成指定形状 `shape` 的随机数数组 如果未指定 `dtype`,默认为 `ms.float32`;否则,按照指定的 `dtype` 生成 """ if dtype is None: dtype = ms.float32 res = ops.standard_normal(shape).astype(dtype) return res def randint(low, high, size, dtype=ms.int32): """ 生成指定范围内的随机整数数组 范围是从 `low` 到 `high`(不包括 `high`),数组大小为 `size`,数据类型为 `dtype` """ res = ops.uniform(size, Tensor(low, dtype), Tensor(high, dtype), dtype=dtype) return res def exists(x): """ 检查输入 `x` 是否不为 `None` """ return x is not None def default(val, d): """ 如果 `val` 存在(不为 `None`),则返回 `val`;否则,如果 `d` 是可调用的,则调用 `d` 并返回结果,否则直接返回 `d` """ if exists(val): return val return d() if callable(d) else d def _check_dtype(d1, d2): """ 检查两个数据类型 `d1` 和 `d2` 如果其中一个是 `ms.float32`,则返回 `ms.float32` 如果两者相同,则返回该数据类型 否则抛出 `ValueError` 异常 """ if ms.float32 in (d1, d2): return ms.float32 if d1 == d2: return d1 raise ValueError('dtype is not supported.') class Residual(nn.Cell): """ 定义了一个名为 `Residual` 的类,继承自 `nn.Cell` 在其构造函数中初始化了一个函数 `fn` 在 `construct` 方法中执行 `fn` 函数对输入 `x` 及其他参数的处理,并将结果与输入 `x` 相加后返回 """ def __init__(self, fn): super().__init__() self.fn = fn def construct(self, x, *args, **kwargs): return self.fn(x, *args, **kwargs) + x def Upsample(dim): """ 定义一个上采样函数,创建一个转置卷积层用于上采样操作,输入和输出通道数都为 `dim` """ return nn.Conv2dTranspose(dim, dim, 4, 2, pad_mode="pad", padding=1) def Downsample(dim): """ 定义一个下采样函数,创建一个卷积层用于下采样操作,输入和输出通道数都为 `dim` """ return nn.Conv2d(dim, dim, 4, 2, pad_mode="pad", padding=1)
位置向量
定义了一个被称作 SinusoidalPositionEmbeddings 的类,该类继承自 nn.Cell 。
在 __init__ 方法(即构造函数)里:
首先,初始化了一个属性 dim ,用于存储输入的维度数值。
接着,计算出 half_dim ,其为 dim 的一半。
随后,计算了一个值 emb ,并将其转换为 Tensor 类型进行保存。
在 construct 方法(即构建方法)中:
计算了 emb ,它是输入 x 与 self.emb 的乘积。
通过 concat 函数将 sin(emb) 和 cos(emb) 沿着最后一个维度进行连接,最终返回所得结果。
总体而言,这个类的主要功能是依据输入的 x 以及预先定义的参数来进行计算,并返回一种位置编码。
代码如下:
class SinusoidalPositionEmbeddings(nn.Cell): def __init__(self, dim): super().__init__() self.dim = dim half_dim = self.dim // 2 emb = math.log(10000) / (half_dim - 1) emb = np.exp(np.arange(half_dim) * - emb) self.emb = Tensor(emb, ms.float32) def construct(self, x): emb = x[:, None] * self.emb[None, :] emb = ops.concat((ops.sin(emb), ops.cos(emb)), axis=-1) return emb
ResNet/ConvNeXT块
首先,定义了一个名为 Block 的类,它继承自 nn.Cell 。
在 Block 类的 __init__ 方法(构造函数)中:
初始化了一个卷积层 self.proj 用于进行维度转换。
初始化了一个组归一化层 self.norm 。
初始化了一个激活函数 self.act 为 SiLU 。
在 Block 类的 construct 方法(构建方法)中:
对输入 x 进行卷积操作。
对卷积结果进行归一化。
如果存在 scale_shift ,则对结果进行相应的调整。
最后应用激活函数并返回结果。
接下来,定义了一个名为 ConvNextBlock 的类,同样继承自 nn.Cell 。
在 ConvNextBlock 类的 __init__ 方法中:
如果存在 time_emb_dim ,创建一个包含 GELU 和 Dense 层的多层感知机 self.mlp 。
创建一个深度可分离卷积层 self.ds_conv 。
创建一个包含多个层的序列 self.net ,包括归一化、卷积、激活函数和再次归一化、卷积等操作。
创建一个用于处理输入和输出维度不一致的卷积层 self.res_conv 。
在 ConvNextBlock 类的 construct 方法中:
对输入 x 进行深度可分离卷积操作。
如果存在 mlp 和 time_emb ,将条件信息添加到卷积结果中。
对结果进行 self.net 中的一系列操作。
将结果与经过 self.res_conv 处理的输入相加并返回。
总的来说,这两个类可能用于构建神经网络中的特定模块,实现特定的卷积和处理逻辑。代码如下:
class Block(nn.Cell): def __init__(self, dim, dim_out, groups=1): super().__init__() self.proj = nn.Conv2d(dim, dim_out, 3, pad_mode="pad", padding=1) self.proj = c(dim, dim_out, 3, padding=1, pad_mode='pad') self.norm = nn.GroupNorm(groups, dim_out) self.act = nn.SiLU() def construct(self, x, scale_shift=None): x = self.proj(x) x = self.norm(x) if exists(scale_shift): scale, shift = scale_shift x = x * (scale + 1) + shift x = self.act(x) return x class ConvNextBlock(nn.Cell): def __init__(self, dim, dim_out, *, time_emb_dim=None, mult=2, norm=True): super().__init__() self.mlp = ( nn.SequentialCell(nn.GELU(), nn.Dense(time_emb_dim, dim)) if exists(time_emb_dim) else None ) self.ds_conv = nn.Conv2d(dim, dim, 7, padding=3, group=dim, pad_mode="pad") self.net = nn.SequentialCell( nn.GroupNorm(1, dim) if norm else nn.Identity(), nn.Conv2d(dim, dim_out * mult, 3, padding=1, pad_mode="pad"), nn.GELU(), nn.GroupNorm(1, dim_out * mult), nn.Conv2d(dim_out * mult, dim_out, 3, padding=1, pad_mode="pad"), ) self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity() def construct(self, x, time_emb=None): h = self.ds_conv(x) if exists(self.mlp) and exists(time_emb): assert exists(time_emb), "time embedding must be passed in" condition = self.mlp(time_emb) condition = condition.expand_dims(-1).expand_dims(-1) h = h + condition h = self.net(h) return h + self.res_conv(x)
Attention模块
Attention 类:
__init__ 方法:
初始化一些参数,如缩放因子 scale 、头的数量 heads 、每个头的维度 dim_head 。
创建用于将输入转换为查询(q)、键(k)和值(v)的卷积层 to_qkv ,以及将处理后的结果转换回输出维度的卷积层 to_out 。
定义了一些操作函数 map 和 partial 。
construct 方法:
获取输入的形状信息。
将 to_qkv 的输出拆分为 q、k、v 。
对 q 进行缩放。
计算 q 和 k 的相似度矩阵 sim ,并进行 softmax 操作得到注意力权重 attn 。
根据 attn 计算输出 out ,并将其重新整形。
最后通过 to_out 卷积层得到最终的输出。
LayerNorm 类:
__init__ 方法:创建一个可学习的参数 g 。
construct 方法:计算输入的方差 var 和均值 mean ,进行归一化处理并乘以 g 。
LinearAttention 类:
与 Attention 类类似的初始化参数和层。
construct 方法:
获取输入形状。
处理得到 q、k、v 。
对 q 和 k 进行 softmax 操作。
计算上下文 context 和输出 out 。
将 out 重新整形,并通过 to_out 序列中的层得到最终输出。
总的来说,定义了三种与注意力机制相关的类,用于在神经网络中进行特征的处理和转换
代码如下:
class Attention(nn.Cell): def __init__(self, dim, heads=4, dim_head=32): super().__init__() self.scale = dim_head ** -0.5 self.heads = heads hidden_dim = dim_head * heads self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, pad_mode='valid', has_bias=False) self.to_out = nn.Conv2d(hidden_dim, dim, 1, pad_mode='valid', has_bias=True) self.map = ops.Map() self.partial = ops.Partial() def construct(self, x): b, _, h, w = x.shape qkv = self.to_qkv(x).chunk(3, 1) q, k, v = self.map(self.partial(rearrange, self.heads), qkv) q = q * self.scale # 'b h d i, b h d j -> b h i j' sim = ops.bmm(q.swapaxes(2, 3), k) attn = ops.softmax(sim, axis=-1) # 'b h i j, b h d j -> b h i d' out = ops.bmm(attn, v.swapaxes(2, 3)) out = out.swapaxes(-1, -2).reshape((b, -1, h, w)) return self.to_out(out) class LayerNorm(nn.Cell): def __init__(self, dim): super().__init__() self.g = Parameter(initializer('ones', (1, dim, 1, 1)), name='g') def construct(self, x): eps = 1e-5 var = x.var(1, keepdims=True) mean = x.mean(1, keep_dims=True) return (x - mean) * rsqrt((var + eps)) * self.g class LinearAttention(nn.Cell): def __init__(self, dim, heads=4, dim_head=32): super().__init__() self.scale = dim_head ** -0.5 self.heads = heads hidden_dim = dim_head * heads self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, pad_mode='valid', has_bias=False) self.to_out = nn.SequentialCell( nn.Conv2d(hidden_dim, dim, 1, pad_mode='valid', has_bias=True), LayerNorm(dim) ) self.map = ops.Map() self.partial = ops.Partial() def construct(self, x): b, _, h, w = x.shape qkv = self.to_qkv(x).chunk(3, 1) q, k, v = self.map(self.partial(rearrange, self.heads), qkv) q = ops.softmax(q, -2) k = ops.softmax(k, -1) q = q * self.scale v = v / (h * w) # 'b h d n, b h e n -> b h d e' context = ops.bmm(k, v.swapaxes(2, 3)) # 'b h d e, b h d n -> b h e n' out = ops.bmm(context.swapaxes(2, 3), q) out = out.reshape((b, -1, h, w)) return self.to_out(out)
组规一化
定义了一个名为 PreNorm 的类,它继承自 nn.Cell 。
在 __init__ 方法(构造函数)中:
接收两个参数:dim(可能表示维度)和 fn(可能是一个函数或其他可调用对象)。
初始化了一个属性 fn 来保存传入的函数。
创建了一个组归一化层 self.norm 。
在 construct 方法(构建方法)中:
首先对输入 x 进行组归一化操作。
然后将归一化后的 x 传入保存的函数 fn 并返回其结果。
总的来说,这个类的作用是在对输入进行特定函数处理之前先进行组归一化操作。
代码如下:
class PreNorm(nn.Cell): def __init__(self, dim, fn): super().__init__() self.fn = fn self.norm = nn.GroupNorm(1, dim) def construct(self, x): x = self.norm(x) return self.fn(x)
条件U-Net
定义了一个名为 Unet 的类,它是一个基于卷积神经网络的模型。
__init__ 方法用于初始化模型的参数和构建模型的各个部分,包括初始卷积层、时间嵌入处理、下采样和上采样的模块等。
construct 方法定义了模型的前向传播过程。首先进行初始卷积,然后通过下采样、中间处理和上采样等步骤对输入进行处理,最终通过最终的卷积层得到输出。
例如,如果输入图像为 [batch_size, channels, height, width] 的张量,并且提供了时间信息,模型会按照上述的流程对输入进行处理,逐步提取特征并生成最终的输出。
代码如下:
# 定义 Unet 类,继承自 `nn.Cell`
class Unet(nn.Cell): def __init__( self, dim, init_dim=None, out_dim=None, dim_mults=(1, 2, 4, 8), channels=3, with_time_emb=True, convnext_mult=2, ): # 调用父类的初始化方法 super().__init__() # 保存输入图像的通道数 self.channels = channels # 如果未提供 init_dim,计算默认值 init_dim = default(init_dim, dim // 3 * 2) # 初始化卷积层,用于将输入的图像通道转换为 init_dim self.init_conv = nn.Conv2d(channels, init_dim, 7, padding=3, pad_mode="pad", has_bias=True) # 计算不同层次的维度 dims = [init_dim, *map(lambda m: dim * m, dim_mults)] in_out = list(zip(dims[:-1], dims[1:])) # 创建部分应用的 ConvNextBlock 类 block_klass = partial(ConvNextBlock, mult=convnext_mult) # 如果启用时间嵌入 if with_time_emb: # 计算时间嵌入的维度 time_dim = dim * 4 # 创建时间嵌入的多层感知机 self.time_mlp = nn.SequentialCell( SinusoidalPositionEmbeddings(dim), nn.Dense(dim, time_dim), nn.GELU(), nn.Dense(time_dim, time_dim), ) else: # 否则,将时间嵌入相关设置为 None time_dim = None self.time_mlp = None # 创建用于下采样的单元格列表 self.downs = nn.CellList([]) # 创建用于上采样的单元格列表 self.ups = nn.CellList([]) num_resolutions = len(in_out) # 构建下采样部分 for ind, (dim_in, dim_out) in enumerate(in_out): is_last = ind >= (num_resolutions - 1) self.downs.append( nn.CellList( [ block_klass(dim_in, dim_out, time_emb_dim=time_dim), block_klass(dim_out, dim_out, time_emb_dim=time_dim), Residual(PreNorm(dim_out, LinearAttention(dim_out))), Downsample(dim_out) if not is_last else nn.Identity(), ] ) ) # 中间块部分 mid_dim = dims[-1] self.mid_block1 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim) self.mid_attn = Residual(PreNorm(mid_dim, Attention(mid_dim))) self.mid_block2 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim) # 构建上采样部分 for ind, (dim_in, dim_out) in enumerate(reversed(in_out[1:])): is_last = ind >= (num_resolutions - 1) self.ups.append( nn.CellList( [ block_klass(dim_out * 2, dim_in, time_emb_dim=time_dim), block_klass(dim_in, dim_in, time_emb_dim=time_dim), Residual(PreNorm(dim_in, LinearAttention(dim_in))), Upsample(dim_in) if not is_last else nn.Identity(), ] ) ) # 如果未提供 out_dim,计算默认值 out_dim = default(out_dim, channels) # 最终的卷积层 self.final_conv = nn.SequentialCell( block_klass(dim, dim), nn.Conv2d(dim, out_dim, 1) ) def construct(self, x, time): # 对输入进行初始卷积 x = self.init_conv(x) # 获取时间嵌入 t = self.time_mlp(time) if exists(self.time_mlp) else None # 存储下采样过程中的中间结果 h = [] # 下采样过程 for block1, block2, attn, downsample in self.downs: x = block1(x, t) x = block2(x, t) x = attn(x) h.append(x) x = downsample(x) # 中间处理 x = self.mid_block1(x, t) x = self.mid_attn(x) x = self.mid_block2(x, t) len_h = len(h) - 1 # 上采样过程 for block1, block2, attn, upsample in self.ups: x = ops.concat((x, h[len_h]), 1) len_h -= 1 x = block1(x, t) x = block2(x, t) x = attn(x) x = upsample(x) # 最终的卷积输出 return self.final_conv(x)
正向扩散
定义了生成线性 beta 调度的函数,并根据指定的步数生成了 betas。
基于 betas 计算了相关的参数,如 alphas、alphas_cumprod 等,并将一些结果转换为 Tensor 类型。
定义了用于提取特定位置值的函数 extract。
下载了一个包含猫猫图像的压缩文件,并打开并调整了其中一张图像的大小,最后显示了该图像。
代码如下:
# 定义一个函数用于生成线性的 beta 调度
def linear_beta_schedule(timesteps): # 定义起始和结束的 beta 值 beta_start = 0.0001 beta_end = 0.02 # 使用线性插值生成指定步数的 beta 值,并转换为 float32 类型 return np.linspace(beta_start, beta_end, timesteps).astype(np.float32) # 设置扩散的步数
timesteps = 200 # 调用函数定义 beta 调度
betas = linear_beta_schedule(timesteps=timesteps) # 计算 alphas
alphas = 1. - betas
# 计算累积乘积的 alphas
alphas_cumprod = np.cumprod(alphas, axis=0)
# 对 alphas_cumprod 进行填充处理
alphas_cumprod_prev = np.pad(alphas_cumprod[:-1], (1, 0), constant_values=1) # 将一些计算结果转换为 Tensor 类型
sqrt_recip_alphas = Tensor(np.sqrt(1. / alphas))
sqrt_alphas_cumprod = Tensor(np.sqrt(alphas_cumprod))
sqrt_one_minus_alphas_cumprod = Tensor(np.sqrt(1. - alphas_cumprod)) # 计算后验方差
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod) # 计算 p2_loss_weight
p2_loss_weight = (1 + alphas_cumprod / (1 - alphas_cumprod)) ** -0.
p2_loss_weight = Tensor(p2_loss_weight) # 定义一个函数用于提取特定位置的值
def extract(a, t, x_shape): b = t.shape[0] # 从 a 中根据 t 提取值 out = Tensor(a).gather(t, -1) # 对提取的值进行形状调整 return out.reshape(b, *((1,) * (len(x_shape) - 1))) # 下载猫猫图像
url = 'https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/image_cat.zip'
path = download(url, './', kind="zip", replace=True)
from PIL import Image # 打开图像
image = Image.open('./image_cat/jpg/000000039769.jpg')
# 调整图像大小
base_width = 160
image = image.resize((base_width, int(float(image.size[1]) * float(base_width / float(image.size[0])))))
# 显示图像
image.show()
运行结果:
定义了一系列对图像的处理变换操作,包括调整大小、裁剪、转换为张量、数值缩放以及随机水平翻转。
从指定路径加载图像文件夹数据集,并对其进行处理和分批。
获取一批数据并打印其形状。
定义了反向变换的操作和组合变换的函数,对获取的数据进行反向变换并显示图像。
例如,通过调整大小和裁剪将图像统一为 128x128 的尺寸,然后进行随机水平翻转、数值处理等,最后可以将处理后的数据再通过反向变换恢复并显示出来,以便观察处理前后的效果。
代码如下:
from mindspore.dataset import ImageFolderDataset # 定义图像的处理变换
image_size = 128
transforms = [ Resize(image_size, Inter.BILINEAR), CenterCrop(image_size), ToTensor(), lambda t: (t * 2) - 1
] # 图像数据集的路径
path = './image_cat' # 创建 ImageFolderDataset 对象
dataset = ImageFolderDataset(dataset_dir=path, num_parallel_workers=cpu_count(), extensions=['.jpg', '.jpeg', '.png', '.tiff'], num_shards=1, shard_id=0, shuffle=False, decode=True) # 只选择 'image' 列
dataset = dataset.project('image') # 在变换列表中插入随机水平翻转
transforms.insert(1, RandomHorizontalFlip()) # 应用变换并将数据集分批处理
dataset_1 = dataset.map(transforms, 'image')
dataset_2 = dataset_1.batch(1, drop_remainder=True) # 获取一批数据中的第一个元素
x_start = next(dataset_2.create_tuple_iterator())[0]
print(x_start.shape) import numpy as np # 定义反向变换的操作
reverse_transform = [ lambda t: (t + 1) / 2, lambda t: ops.permute(t, (1, 2, 0)), # 从 CHW 转换为 HWC lambda t: t * 255., lambda t: t.asnumpy().astype(np.uint8), ToPIL()
] # 定义组合变换的函数
def compose(transform, x): for d in transform: x = d(x) return x # 对获取的数据进行反向变换并显示
reverse_image = compose(reverse_transform, x_start[0])
reverse_image.show()
运行结果:
q_sample 函数用于根据输入的起始图像、时间步和可能的噪声生成带噪的样本。
get_noisy_image 函数通过调用 q_sample 得到带噪图像,并将其转换为 PIL 图像。
plot 函数用于绘制图像,支持设置是否包含原始图像以及行标题等。
p_losses 函数计算模型的损失,涉及生成带噪样本、获取模型预测的噪声、计算损失并进行一些处理。
例如,通过设置不同的时间步 t ,可以观察到不同程度噪声的图像效果。而 p_losses 函数用于在训练过程中计算损失,以优化模型。
代码如下:
# 定义 q_sample 函数,用于根据起始图像、时间步和噪声生成带噪样本
def q_sample(x_start, t, noise=None): if noise is None: noise = randn_like(x_start) return (extract(sqrt_alphas_cumprod, t, x_start.shape) * x_start + extract(sqrt_one_minus_alphas_cumprod, t, x_start.shape) * noise) # 定义 get_noisy_image 函数,获取添加噪声后的图像并转换为 PIL 图像
def get_noisy_image(x_start, t): # 添加噪音 x_noisy = q_sample(x_start, t=t) # 转换为 PIL 图像 noisy_image = compose(reverse_transform, x_noisy[0]) return noisy_image # 设置时间步
t = Tensor([40])
noisy_image = get_noisy_image(x_start, t)
print(noisy_image)
noisy_image.show() import matplotlib.pyplot as plt # 定义 plot 函数用于绘制图像
def plot(imgs, with_orig=False, row_title=None, **imshow_kwargs): if not isinstance(imgs[0], list): imgs = [imgs] num_rows = len(imgs) num_cols = len(imgs[0]) + with_orig _, axs = plt.subplots(figsize=(200, 200), nrows=num_rows, ncols=num_cols, squeeze=False) for row_idx, row in enumerate(imgs): row = [image] + row if with_orig else row for col_idx, img in enumerate(row): ax = axs[row_idx, col_idx] ax.imshow(np.asarray(img), **imshow_kwargs) ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[]) if with_orig: axs[0, 0].set(title='Original image') axs[0, 0].title.set_size(8) if row_title is not None: for row_idx in range(num_rows): axs[row_idx, 0].set(ylabel=row_title[row_idx]) plt.tight_layout() # 调用 plot 函数绘制多个时间步的噪声图像 plot([get_noisy_image(x_start, Tensor([t])) for t in [0, 50, 100, 150, 199]]) # 定义 p_losses 函数计算损失
def p_losses(unet_model, x_start, t, noise=None): if noise is None: noise = randn_like(x_start) x_noisy = q_sample(x_start=x_start, t=t, noise=noise) predicted_noise = unet_model(x_noisy, t) loss = nn.SmoothL1Loss()(noise, predicted_noise) # todo loss = loss.reshape(loss.shape[0], -1) loss = loss * extract(p2_loss_weight, t, loss.shape) return loss.mean()
运行结果:
数据准备与处理
下载 MNIST 数据集的压缩文件。
从 mindspore.dataset 中导入 FashionMnistDataset 类。
定义了一些数据集的参数,如图像尺寸、通道数、批大小等。
从指定路径创建用于训练的 FashionMnistDataset 对象,并设置了一些数据加载的参数,如是否打乱、并行处理数量等。
定义了数据变换操作,包括随机水平翻转、转换为张量和数值处理。
对数据集进行了选择列、打乱、应用变换和分批处理。
获取一批数据并打印其键。
例如,通过这些操作,对 MNIST 数据集进行了预处理和分批,为后续的模型训练或数据处理做好了准备。打印键可以查看这批数据中包含的字段信息。
代码如下:
# 下载 MNIST 数据集
url = 'https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/dataset.zip'
path = download(url, './', kind="zip", replace=True) from mindspore.dataset import FashionMnistDataset # 定义图像的尺寸、通道数和批大小
image_size = 28
channels = 1
batch_size = 16 # 数据集的路径
fashion_mnist_dataset_dir = "./dataset" # 创建 FashionMnistDataset 对象用于训练数据
dataset = FashionMnistDataset(dataset_dir=fashion_mnist_dataset_dir, usage="train", num_parallel_workers=cpu_count(), shuffle=True, num_shards=1, shard_id=0) # 定义数据的变换操作
transforms = [ RandomHorizontalFlip(), ToTensor(), lambda t: (t * 2) - 1
] # 只选择 'image' 列
dataset = dataset.project('image')
# 对数据集进行随机打乱
dataset = dataset.shuffle(64)
# 应用变换操作
dataset = dataset.map(transforms, 'image')
# 将数据集分批处理
dataset = dataset.batch(16, drop_remainder=True) # 获取一批数据并打印其键
x = next(dataset.create_dict_iterator())
print(x.keys())
运行结果:
p_sample 函数根据模型输出、输入数据、时间步等计算采样结果,在不同的时间步索引下有不同的处理逻辑。
p_sample_loop 函数通过一个循环多次调用 p_sample ,从纯噪声开始逐步生成采样结果,并将每次的结果保存到 imgs 列表中。
sample 函数用于设置采样的形状参数,并调用 p_sample_loop 进行采样操作。
例如,在模型训练完成后,可以使用 sample 函数生成采样图像,通过逐步调整时间步来观察生成的效果。
代码如下:
# 定义 p_sample 函数,用于根据模型预测、输入图像、时间步和时间步索引生成采样结果
def p_sample(model, x, t, t_index): # 提取相关的参数 betas_t = extract(betas, t, x.shape) sqrt_one_minus_alphas_cumprod_t = extract( sqrt_one_minus_alphas_cumprod, t, x.shape ) sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape) # 计算模型均值 model_mean = sqrt_recip_alphas_t * (x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t) if t_index == 0: return model_mean # 提取后验方差 posterior_variance_t = extract(posterior_variance, t, x.shape) # 生成噪声 noise = randn_like(x) return model_mean + ops.sqrt(posterior_variance_t) * noise # 定义 p_sample_loop 函数,通过循环进行多次 p_sample 操作
def p_sample_loop(model, shape): b = shape[0] # 从纯噪声开始 img = randn(shape, dtype=None) imgs = [] for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps): img = p_sample(model, img, ms.numpy.full((b,), i, dtype=mstype.int32), i) imgs.append(img.asnumpy()) return imgs # 定义 sample 函数,用于调用 p_sample_loop 进行采样
def sample(model, image_size, batch_size=16, channels=3): return p_sample_loop(model, shape=(batch_size, channels, image_size, image_size))
训练过程
定义了一个动态变化的学习率 lr。
构建了 Unet 模型,并处理了模型参数的名称。
定义了优化器 optimizer 和损失缩放器 loss_scaler。
定义了前向计算函数 forward_fn、梯度计算函数 grad_fn 和梯度更新函数 train_step。
在训练循环中,每次迭代从数据集中获取一批数据,生成随机的时间步和噪声,计算损失并进行梯度更新。
按照一定的间隔打印训练过程中的损失信息。
在每个 epoch 结束时,计算训练时间,并展示模型的随机采样效果。
例如,通过不断迭代训练数据,模型的参数会根据梯度进行调整,以优化模型的性能。同时,通过观察损失的变化和采样效果,可以评估模型的训练进展和效果。
代码如下:
# 定义动态学习率
lr = nn.cosine_decay_lr(min_lr=1e-7, max_lr=1e-4, total_step=10*3750, step_per_epoch=3750, decay_epoch=10) # 定义 Unet 模型
unet_model = Unet( dim=image_size, channels=channels, dim_mults=(1, 2, 4,)
) name_list = []
for (name, par) in list(unet_model.parameters_and_names()): name_list.append(name)
i = 0
for item in list(unet_model.trainable_params()): item.name = name_list[i] i += 1 # 定义优化器
optimizer = nn.Adam(unet_model.trainable_params(), learning_rate=lr)
loss_scaler = DynamicLossScaler(65536, 2, 1000) # 定义前向过程
def forward_fn(data, t, noise=None): loss = p_losses(unet_model, data, t, noise) return loss # 计算梯度
grad_fn = ms.value_and_grad(forward_fn, None, optimizer.parameters, has_aux=False) # 梯度更新
def train_step(data, t, noise): loss, grads = grad_fn(data, t, noise) optimizer(grads) return loss import time # 由于时间原因,epochs 设置为 1,可根据需求进行调整
epochs = 1 for epoch in range(epochs): begin_time = time.time() for step, batch in enumerate(dataset.create_tuple_iterator()): unet_model.set_train() batch_size = batch[0].shape[0] t = randint(0, timesteps, (batch_size,), dtype=ms.int32) noise = randn_like(batch[0]) loss = train_step(batch[0], t, noise) if step % 500 == 0: print(" epoch: ", epoch, " step: ", step, " Loss: ", loss) end_time = time.time() times = end_time - begin_time print("training time:", times, "s") # 展示随机采样效果 unet_model.set_train(False) samples = sample(unet_model, image_size=image_size, batch_size=64, channels=channels) plt.imshow(samples[-1][5].reshape(image_size, image_size, channels), cmap="gray")
print("Training Success!")
运行结果:
推理过程
首先对模型设置为评估模式,并进行采样得到 64 个图像的结果。
展示了采样结果中最后一个图像的第 5 个图像的效果。
然后选择了一个新的随机索引 53 。
通过循环创建了一系列图像,并将它们存储在 ims 列表中。
使用 matplotlib.animation 库创建了一个动画,展示每个时间步对应的采样图像。
将动画保存为 diffusion.gif 并显示出来。
例如,通过这个动画,可以直观地观察到随着时间步的变化,采样图像的演变过程。
代码如下:
# 采样 64 个图片
unet_model.set_train(False)
samples = sample(unet_model, image_size=image_size, batch_size=64, channels=channels) # 展示一个随机效果
random_index = 5
plt.imshow(samples[-1][random_index].reshape(image_size, image_size, channels), cmap="gray") import matplotlib.animation as animation random_index = 53 fig = plt.figure()
ims = [] for i in range(timesteps): # 显示每个时间步对应的图像 im = plt.imshow(samples[i][random_index].reshape(image_size, image_size, channels), cmap="gray", animated=True) ims.append([im]) # 创建动画
animate = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=100)
# 保存动画为 'diffusion.gif'
animate.save('diffusion.gif')
plt.show()
运行结果: