一。准备数据
1.数据集下载
由于数据集并没有进行分类等操作,故需先手动处理。
2.处理数据
数据集并不存在验证集,故从训练集移动百分之十的数据作为验证集,并讲原始数据集整理成只包含单一种类的文件夹。
import shutil
import os
def get_address():"""返回狗地址列表、猫地址列表、工作目录"""data_file = os.listdir('./train/')print('图片或文件数量:', str(len(data_file))) # 25000dog_file = list(filter(lambda x: x[:3] == 'dog', data_file))cat_file = list(filter(lambda x: x[:3] == 'cat', data_file))print('狗:', str(len(dog_file)), '\n猫:', str(len(cat_file))) # 狗:12500 猫:12500root = os.getcwd()return dog_file, cat_file, rootdef arrange():"""整理数据,移动图片位置"""dog_file, cat_file, root = get_address()print('开始数据整理')# 新建文件夹for i in ['dog', 'cat']:for j in ['train', 'val']:try:os.makedirs(os.path.join(root,j,i))except FileExistsError as e:pass# 移动10%(1250)的狗图到验证集for i, file in enumerate(dog_file):ori_path = os.path.join(root, 'train', file)if i < 0.9*len(dog_file):des_path = os.path.join(root, 'train', 'dog')else:des_path = os.path.join(root, 'val', 'dog')shutil.move(ori_path, des_path)# 移动10%(1250)的猫图到验证集for i, file in enumerate(cat_file):ori_path = os.path.join(root, 'train', file)if i < 0.9*len(cat_file):des_path = os.path.join(root, 'train', 'cat')else:des_path = os.path.join(root, 'val', 'cat')shutil.move(ori_path, des_path)print('数据整理完成')arrange()
处理完成后:
二。训练模型
1.先对数据集进行加载:
def get_data(input_size, batch_size):"""获取文件数据并转换"""transform_train = transforms.Compose([transforms.RandomResizedCrop(input_size),transforms.RandomHorizontalFlip(),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])train_set = ImageFolder('train', transform=transform_train)#此处会默认根据文件夹设置target,是0和1,而不是cat和dogtrain_loader = DataLoader(dataset=train_set,batch_size=batch_size,shuffle=True)transform_val = transforms.Compose([transforms.Resize([input_size, input_size]),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])val_set = ImageFolder('val', transform=transform_val)val_loader = DataLoader(dataset=val_set,batch_size=batch_size,shuffle=False)return transform_train, train_set, train_loader, transform_val, val_set, val_loader
transforms.RandomResizedCrop(input_size):随机裁剪并调整图像到指定大小,这有助于增强模型的鲁棒性。
transforms.RandomHorizontalFlip():随机水平翻转图像,进一步增强模型的泛化能力。
transforms.ToTensor():将图像从PIL格式转换为张量格式,并将像素值归一化到 [0, 1]。
transforms.Normalize(mean, std):根据给定的均值和标准差对图像进行标准化,使得输入数据的分布更接近标准正态分布(均值为0,方差为1),从而提高模型的训练效果。
验证集通常不需要随机变换。
2.模型:
直接微调resnet18模型
transfer_model = models.resnet18(pretrained=True) #加载预训练的模型,可以根据数据集难度修改想使用的模型for param in transfer_model.parameters(): #冻结卷积层的参数,这样做的目的是只训练模型的最后一层(即全连接层),因为预训练的卷积层已经学习到了有用的特征。param.requires_grad = Falsedim = transfer_model.fc.in_features #获取全连接层(fc)的输入特征数,这通常是前一层的输出维度。transfer_model.fc = nn.Linear(dim, 2) #将全连接层替换为新的全连接层,输出维度设置为 2,因为只有猫和狗device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")net = transfer_model.to(device)criterion = nn.CrossEntropyLoss() #定义交叉熵损失函数,适用于多类分类问题。optimizer = torch.optim.SGD(net.fc.parameters(), lr=lr) #SGD能够更快地进行迭代,适合处理大规模数据集。这里也只需要优化全连接层的参数
3.训练
#训练函数
def train(net, optimizer, device, criterion, train_loader, epoch, writer):net.train()running_loss = 0.0batch_num = len(train_loader)for i, data in enumerate(train_loader, start=1):#start可以在输出日志时更直观地表示当前处理的批次(如 “batch 1” 而不是 “batch 0”),对于调试和跟踪训练进度更友好。inputs, labels = datainputs, labels = inputs.to(device), labels.to(device)optimizer.zero_grad()outputs = net(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()running_loss += loss.item()if i % 10 == 0:avg_loss = running_loss / 10print('Epoch: {}, batch:{}/{} loss:{:.3f}'.format(epoch + 1, i, batch_num, avg_loss))writer.add_scalar('Training Loss', avg_loss, epoch * batch_num + i)running_loss = 0.0# 保存模型权重(每个 epoch 保存一次)torch.save(net.state_dict(), f'{save_path}_epoch_{epoch+1}.pth')print(f'Model saved after epoch {epoch+1}')
#评估函数
def validate(net, device, val_loader, epoch, writer):net.eval()correct = 0total = 0with torch.no_grad():for data in val_loader:images, labels = dataimages, labels = images.to(device), labels.to(device)outputs = net(images)_, predicted = torch.max(outputs.data, 1)total += labels.size(0)correct += (predicted == labels).sum().item()
为了方便观察,使用了tensorboard来记录,完整代码如下:
import torch
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms, models
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
from torch import nn
#from setting import input_size, batch_size, save_path, lr, n_epoch
"""调整设置"""
input_size = 224 # 裁剪图片大小
batch_size = 128 # 一次训练所选取的样本数(直接影响到GPU内存的使用情况)
save_path = './weights.pt' # 训练参数储存地址
lr = 1e-3 # 学习率
n_epoch = 10 # 训练次数def get_data(input_size, batch_size):"""获取文件数据并转换"""transform_train = transforms.Compose([transforms.RandomResizedCrop(input_size),transforms.RandomHorizontalFlip(),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])train_set = ImageFolder('train', transform=transform_train)train_loader = DataLoader(dataset=train_set,batch_size=batch_size,shuffle=True)transform_val = transforms.Compose([transforms.Resize([input_size, input_size]),transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])val_set = ImageFolder('val', transform=transform_val)val_loader = DataLoader(dataset=val_set,batch_size=batch_size,shuffle=False)return transform_train, train_set, train_loader, transform_val, val_set, val_loaderdef train(net, optimizer, device, criterion, train_loader, epoch, writer):net.train()running_loss = 0.0batch_num = len(train_loader)for i, data in enumerate(train_loader, start=1):#start可以在输出日志时更直观地表示当前处理的批次(如 “batch 1” 而不是 “batch 0”),对于调试和跟踪训练进度更友好。inputs, labels = datainputs, labels = inputs.to(device), labels.to(device)optimizer.zero_grad()outputs = net(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()running_loss += loss.item()if i % 10 == 0:avg_loss = running_loss / 10print('Epoch: {}, batch:{}/{} loss:{:.3f}'.format(epoch + 1, i, batch_num, avg_loss))writer.add_scalar('Training Loss', avg_loss, epoch * batch_num + i)running_loss = 0.0# 保存模型权重(每个 epoch 保存一次)torch.save(net.state_dict(), f'{save_path}_epoch_{epoch+1}.pth')print(f'Model saved after epoch {epoch+1}')def validate(net, device, val_loader, epoch, writer):net.eval()correct = 0total = 0with torch.no_grad():for data in val_loader:images, labels = dataimages, labels = images.to(device), labels.to(device)outputs = net(images)_, predicted = torch.max(outputs.data, 1)total += labels.size(0)correct += (predicted == labels).sum().item()accuracy = 100 * correct / totalprint('Validation Accuracy after epoch {}: {:.2f} %'.format(epoch + 1, accuracy))writer.add_scalar('Validation Accuracy', accuracy, epoch + 1)if __name__ == '__main__':writer = SummaryWriter(log_dir='./logs') # TensorBoard 日志记录transfer_model = models.resnet18(pretrained=True) #加载预训练的模型for param in transfer_model.parameters(): #冻结卷积层的参数,这样做的目的是只训练模型的最后一层(即全连接层),因为预训练的卷积层已经学习到了有用的特征。param.requires_grad = Falsedim = transfer_model.fc.in_features #获取全连接层(fc)的输入特征数,这通常是前一层的输出维度。transfer_model.fc = nn.Linear(dim, 2) #将全连接层替换为新的全连接层,输出维度设置为 2,因为只有猫和狗device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")net = transfer_model.to(device)criterion = nn.CrossEntropyLoss() #定义交叉熵损失函数,适用于多类分类问题。optimizer = torch.optim.SGD(net.fc.parameters(), lr=lr) #SGD能够更快地进行迭代,适合处理大规模数据集。这里也只需要优化全连接层的参数transform_train, train_set, train_loader, transform_val, val_set, val_loader = get_data(input_size, batch_size)# 记录模型结构到Tensorboardsample_inputs, _ = next(iter(train_loader))writer.add_graph(net, sample_inputs.to(device))for epoch in range(n_epoch):print('Epoch {}/{}'.format(epoch + 1, n_epoch))train(net, optimizer, device, criterion, train_loader, epoch, writer)validate(net, device, val_loader, epoch, writer)torch.save(net.state_dict(), save_path)writer.close() # 关闭 TensorBoard
三。结果
查看tensorboard日志
这里只训练了十个epoch,有需要的可以自行修改
四。可视化
使用gradio进行可视化,地址
代码如下:
import gradio as gr
def test(File):from PIL import Imageimport torchfrom torchvision import modelsfrom torch import nnfrom setting import input_size, save_pathfrom torchvision import transforms# ------------------------ 加载数据 --------------------------- ## 定义预训练变换transform_val = transforms.Compose([transforms.Resize([input_size, input_size]), # 注意 Resize 参数是 2 维,和 RandomResizedCrop 不同transforms.ToTensor(),transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")# ------------------------ 载入模型并且训练 --------------------------- #transfer_model = models.resnet18(pretrained=True)for param in transfer_model.parameters():param.requires_grad = Falsedim = transfer_model.fc.in_featurestransfer_model.fc = nn.Linear(dim, 2)# 构建神经网络net = transfer_model.to(device)net.load_state_dict(torch.load('weights.pt_epoch_10.pth'))net.eval()image_PIL = Image.open(File).convert('RGB')image_tensor = transform_val(image_PIL)# 以下语句等效于 image_tensor = torch.unsqueeze(image_tensor, 0)image_tensor.unsqueeze_(0)# 没有这句话会报错image_tensor = image_tensor.to(device)out = net(image_tensor)# 得到预测结果,并且从大到小排序_, indices = torch.sort(out, descending=True)# 返回每个预测值的百分数percentage = torch.nn.functional.softmax(out, dim=1)[0] * 100if percentage[0] > percentage[1]:out = '此图片是只猫'else:out = '此图片是只狗'return out# 该函数获取图片路径并调用已有的预测函数
def process_image(image):# 保存用户上传的图片image_path = "temp_image.jpg"image.save(image_path)# 调用已经存在的模型预测函数,并获取结果result = test(image_path)# 返回结果return result# Gradio 界面:左侧上传图片,右侧显示结果
iface = gr.Interface(fn=process_image,inputs=gr.Image(type="pil"), # 输入为图片outputs="text", # 输出为文本title="猫狗识别",description="上传图片,查看模型的预测结果。"
)# 启动 Gradio 应用
iface.launch()